Compare commits

..

57 commits

Author SHA1 Message Date
8c43b4b406 OIDC authentication: provide link to view/edit user data 2025-09-01 08:54:08 +02:00
e079ef6747 OIDC authentication: minor fixes 2025-08-31 09:31:49 +02:00
ec97d4f82b provide tests.util package, starting with setup_logging() 2025-08-22 16:00:34 +02:00
9f0eaa8675 config.py: remove obsolete setting 2025-08-22 12:30:39 +02:00
287b62f7b8 minor fixes 2025-08-09 15:23:40 +02:00
bc2b0aaa2c minor config / testing fixes 2025-08-08 23:16:35 +02:00
d5616b207e add simple (dummy) test for external (zitadel) user creation 2025-08-08 10:10:16 +02:00
94365602ca OIDC auth: directly use sub from JWT if appropriate; optionally add groups from calling application 2025-08-06 17:51:15 +02:00
775603046a org.user: save intermediate state (create / update grants not working: 404 Not Found) 2025-08-06 09:45:56 +02:00
3e43c25d84 org.user, web.client: user update on zitadel working 2025-08-04 18:39:18 +02:00
7427370b5c org.user, web.client: user creation via zitadel API basically working 2025-07-31 17:24:17 +02:00
c23069a3c1 work in progress. send user data to identity provider: basic fixes, prepare tests 2025-07-26 17:43:58 +02:00
2698a578df work in progress: send user data to external identity provider (zitadel) 2025-07-25 19:53:26 +02:00
626ff6e673 move API client authentication to auth.oidc 2025-07-22 19:07:02 +02:00
b425462f12 API authentication on zitadel server working 2025-07-22 09:41:24 +02:00
1b58c7fb22 rename organize to org; work in progress: user management with access to auth provider 2025-07-21 11:07:51 +02:00
722b258103 fix base_url references 2025-05-14 17:02:23 +02:00
05499d5d41 auth: use form['camefrom'] as requested URI, store in initial session and use for final redirect 2025-05-14 16:25:34 +02:00
2f87493144 auth: fully functional logout 2025-05-14 09:03:15 +02:00
a2c529e6d3 storage test: save session message, may be used as session storage 2025-05-14 09:02:38 +02:00
b55191dab3 storage.message: definitions, start with tests 2025-04-30 18:08:45 +02:00
ee5a76a808 auth improvements (JWT stuff, esp tests); new: storage.message (event store) 2025-04-30 16:42:40 +02:00
5eb9531997 auth: directly use user data from id_token (no user_info request) 2025-04-29 17:36:02 +02:00
99f717a816 provide some example data + code (for JWT encoding/decoding) in 'scratch' module 2025-04-29 15:05:03 +02:00
258baa88b2 auth fixes (fetch keys via request); collect responses for testing in separate file 2025-04-29 09:18:57 +02:00
6857601ab8 work in progress: oidc auth tests 2025-04-28 15:29:21 +02:00
01fc7d2874 auth, work in progress: decode id_token, + other improvements 2025-04-25 20:34:52 +02:00
87310b9798 auth: improve loading of oidc provider data, provide and check in test 2025-04-23 09:10:43 +02:00
b2d1c7888b work in progress: dummy oidc data handler for testing 2025-04-22 11:10:43 +02:00
f21910e675 auth: get OIDC provider URIs, endpoints, and keys from OP config URLs 2025-04-21 11:53:23 +02:00
96afb631e0 rename server to web 2025-04-20 16:42:13 +02:00
bba081156b create package server.auth with module oidc (former server.auth module) 2025-04-20 11:10:05 +02:00
67985a6bdb auth: logout: start implementation (expire cookie) 2025-04-20 10:42:42 +02:00
d128c5f138 auth: use logger.debug instead of print 2025-04-16 16:51:32 +02:00
1918183c59 basic logging set-up 2025-04-16 10:54:25 +02:00
cae934c2d7 fix pyproject: +requests 2025-04-15 18:33:37 +02:00
4b791cf83b auth: principal with correct groups => login and auth basically working 2025-04-07 10:21:25 +02:00
2a52d8a481 auth: user info -> principal 2025-04-06 22:39:10 +02:00
35cf8884bf auth: unauthorized: call login() 2025-04-05 17:33:36 +02:00
8d3ff5b667 auth: store user data in cookie, retrieve in authenticate() 2025-04-05 12:31:26 +02:00
7bca60e74c auth: basic OIDC flow with cookie encryption and final redirect working 2025-04-04 16:48:53 +02:00
ec80be5f97 use waitress as http server; provide simple shell script; auth improvments 2025-03-27 22:35:19 +01:00
f911dbf590 oidc auth: get rid of pyoidc (oid) - provide random and crypt functionality in scopes.util 2025-03-27 08:27:02 +01:00
950fcb4174 oidc auth: login and retrieval of user data basically working 2025-03-26 18:15:38 +01:00
c1f07effee oidc auth: improvements, store info in cookie 2025-03-26 15:43:50 +01:00
0207d12b46 work in progress: oidc auth - redirect to oidc provider OK, start processing callback 2025-03-25 11:08:14 +01:00
87c0c1db2e work in progress: oidc authentication: start login processing 2025-03-24 22:26:17 +01:00
3e25b5e593 work in progress: entry points for OpenID Connect (oidc) authentication 2025-03-24 12:04:53 +01:00
b4051147ee server.browser.register: allow for explicit string prefixes as context types 2025-03-22 11:02:36 +01:00
f04297d570 provide DummyMailDelivery for testing; fix auth 2025-03-09 09:31:59 +01:00
efd47419a0 allow additional db parameters when setting up storage 2025-02-11 11:31:56 +01:00
1eff3d2c8b fix auth: don't raise Unauthorized 2024-11-22 11:34:40 +01:00
eaa2055c76 work in progress: JWT authentication: baseAuth as property, remove registration function 2024-11-16 09:27:01 +01:00
a3da3f07f0 work in progress: JWT authentication 2024-11-15 08:41:42 +01:00
ab050ba360 move tests into scopes package, provide runtests.sh calling zope-testrunner 2024-09-30 09:50:58 +02:00
c9cb428707 fix tests: avoid db locked error with sqlite3 2024-09-20 15:21:56 +02:00
a4c24a44d8 avoid 'db locked' error with sqlite on first run: provide additional commit() 2024-08-17 11:26:04 +02:00
41 changed files with 1068 additions and 111 deletions

2
.gitignore vendored
View file

@ -2,12 +2,14 @@
*.pyo
*.egg-info
*.project
*.log
*.swp
*.pydevproject
*.sublime-project
*.sublime-workspace
*.ropeproject
.env
.private*
.pytest.ini
*#*#
*.#*

48
demo/config.py Normal file
View file

@ -0,0 +1,48 @@
# py-scopes/demo/config.py
from dotenv import load_dotenv
import logging
from os import getenv
from scopes.web.app import zope_app_factory
load_dotenv()
log_file = 'log/scopes.log'
log_level = logging.DEBUG
log_format = '%(asctime)s %(levelname)s %(name)s %(message)s'
log_dateformat = '%Y-%m-%dT%H:%M:%S'
logging.basicConfig(filename=log_file, level=log_level,
format=log_format, datefmt=log_dateformat)
server_port = getenv('SERVER_PORT', '8099')
base_url = getenv('BASE_URL', 'https://demo.cy7.de')
app_factory = zope_app_factory
# storage settings
from scopes.storage.db.postgres import StorageFactory
dbengine = 'postgresql+psycopg'
dbname = getenv('DBNAME', 'demo')
dbuser = getenv('DBUSER', 'demo')
dbpassword = getenv('DBPASSWORD', 'secret')
dbschema = getenv('DBSCHEMA', 'demo')
# authentication settings
oidc_provider = getenv('OIDC_PROVIDER', 'https://a1.cy7.de')
oidc_client_id = getenv('OIDC_CLIENT_ID', '12345')
oidc_params = dict(
op_config_url=oidc_provider + '/.well-known/openid-configuration',
op_uris=None,
op_keys=None,
op_project_scope='urn:zitadel:iam:org:project:id:zitadel:aud',
callback_url=getenv('OIDC_CALLBACK_URL', base_url + '/auth/callback'),
client_id = oidc_client_id,
cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id),
cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None),
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None),
private_key_file=getenv('OIDC_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json'),
organization_id=getenv('OIDC_ORGANIZATION_ID', '12346'),
project_id=getenv('OIDC_PROJECT_ID', None),
)

21
demo/demo_server.py Normal file
View file

@ -0,0 +1,21 @@
# py-scopes/demo/demo_server.py
from scopes.web.auth import oidc
from scopes.storage import topic
import logging
import waitress
def run(app, config):
oidc.startup() # todo: use generic app.startServices()
port = int(config.server_port)
print(f'Serving on port {port}.')
waitress.serve(app, port=port)
if __name__ == '__main__':
import config
app = config.app_factory(config)
run(app, config)
# see zope.app.wsgi.getWSGIApplication

17
demo/env.in Normal file
View file

@ -0,0 +1,17 @@
# s10: py-scopes/demo/.env
# input (example) file - copy to .env and edit.
SERVER_PORT=8800
BASE_URL=https://demo.cy7.de
DBNAME=demo
DBUSER=demo
DBPASSWORD=secret
DBSCHEMA=demo
OIDC_PROVIDER=
OIDC_CLIENT_ID=
OIDC_COOKIE_CRYPT=
OIDC_ORGANIZATION_ID=
OIDC_PROJECT_ID=

2
demo/log/README.md Normal file
View file

@ -0,0 +1,2 @@
directory for logfiles created by application

11
demo/shell.py Normal file
View file

@ -0,0 +1,11 @@
# scopes/demo/shell.py
# simple shell for interactive testing / accessing the database (storage)
# use: `python -i shell.py`
import config
from scopes.web.auth import oidc
from scopes.storage.folder import Root
from scopes.storage import topic
storage = config.StorageFactory(config)(config.dbschema)
root = Root(storage)

43
pyproject.toml Normal file
View file

@ -0,0 +1,43 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "py-scopes"
version = "3.0.1"
description = "Implementation of the unknown 'scopes' paradigm in Python"
readme = "README.md"
license = {text = "MIT"}
keywords = ["scopes"]
authors = [{name = "Helmut Merz", email = "helmutm@cy55.de"}]
dependencies = [
"SQLAlchemy",
]
[project.optional-dependencies]
postgres = [
"psycopg[binary]",
"transaction",
"zope.sqlalchemy",
]
app = [
"python-dotenv",
"waitress",
"zope.authentication",
"zope.interface",
"zope.publisher",
"zope.traversing",
]
auth = ["pyjwt[crypto]", "cryptography", "requests"]
test = ["zope.testrunner"]
#test = ["pytest"]
[tool.setuptools]
packages = ["scopes"]
#[tool.pytest.ini_options]
#addopts = "-vv"
#python_files = "test_standard.py" # default: run only `standard` tests
# use .pytest.ini file with `python_files = test_*.py` to run all tests

5
runtests.sh Executable file
View file

@ -0,0 +1,5 @@
# runtests.sh
# run all unit / doc tests
zope-testrunner --test-path=. $*

View file

@ -1,9 +1,14 @@
# scopes.interfaces
from zope.interface import Interface
from zope.interface import Interface, Attribute
class ITraversable(Interface):
class IViewable(Interface):
prefix = Attribute('Prefix string for identifying the type (class) of an object')
class ITraversable(IViewable):
def get(key, default=None):
"""Return the item addressed by `key`; return `default` if not found."""

1
scopes/org/__init__.py Normal file
View file

@ -0,0 +1 @@
"""package scopes.org"""

16
scopes/org/mail.py Normal file
View file

@ -0,0 +1,16 @@
# scopes.org.mail
from zope.interface import implementer
from zope.sendmail.interfaces import IMailDelivery
"""Utilities for creating and sending emails."""
@implementer(IMailDelivery)
class DummyMailDelivery:
"""For testing purposes: just store mails in maildata.log"""
def send(self, fromaddr, toaddrs, message):
print("DummyMailDelivery")
print(f"fromaddr: {fromaddr}, toaddrs: {toaddrs}")
print(message)

View file

@ -1,4 +1,4 @@
# scopes.organize.task
# scopes.org.task
"""Task (and corresponding container) implementation."""

95
scopes/org/user.py Normal file
View file

@ -0,0 +1,95 @@
# scopes.org.user
"""Basic user account (principal) definitions + access to identity provider."""
from dataclasses import dataclass, field
from typing import List, Optional
from scopes.web import client
from scopes import util
import config
@dataclass
class User:
login: str
email: str
hashedPassword: Optional[str] = None
firstName: str = ''
lastName: str = ''
displayName: str = ''
groups: List[str] = field(default_factory=list)
def __post_init__(self):
if not self.displayName:
self.displayName = ' '.join((self.firstName, self.lastName))
class ExtUser:
"""All infos for exchanging user data with an external service.
This base class implements the zitadel interface (as of version 3.3.2).
For other identity providers sublass accordingly.
"""
provider = 'zitatel'
endpoints = dict(
users_human='v2/users/human',
#create_authorization='management/v1/zitadel.authorization.v2beta.AuthorizationService/CreateAuthorization',
create_authorization='v2beta/authorizations',
)
def __init__(self, user, idPrefix=''):
self.user = user
self.userId = idPrefix + user.login
self.client = client.ApiClient(config.oidc_provider)
def asDict(self):
params = config.oidc_params
data = dict(
userId=self.userId,
username=self.user.login,
email=dict(email=self.user.email, isVerified=True),
profile=dict(
givenName=self.user.firstName,
familyName=self.user.lastName,
displayName=self.user.displayName,
),
organization=dict(orgId=params['organization_id']),
)
return data
def create(self, updateIfExists=False):
data = self.asDict()
if self.user.hashedPassword:
data['hashedPassword'] = self.user.hashedPassword
status, res = self.client.post(self.endpoints['users_human'], data)
if status > 201:
if updateIfExists:
return self.update()
return status, res
#if self.user.groups:
#return self.createGroups()
def update(self, createIfMissing=False):
data = self.asDict()
if self.user.hashedPassword:
data['password'] = dict(hashedPassword=self.user.hashedPassword)
status, res = self.client.put(self.endpoints['users_human'], self.userId, data)
if status > 200:
if createIfMissing:
return self.create()
else:
return status, res
#if self.user.groups:
#return self.updateGroups()
def createGroups(self):
data = dict(
userId=self.userId,
projectId=config.oidc_params['project_id'],
roleKeys=self.user.groups,
)
return self.client.post(self.endpoints['create_authorization'], data)

View file

@ -1 +0,0 @@
"""package scopes.organize"""

View file

@ -1 +0,0 @@
"""package scopes.server"""

View file

@ -52,8 +52,8 @@ class Storage(object):
return metadata.tables.get((schema and schema + '.' or '') + tableName)
def dropTable(self, tableName):
prefix = self.schema and self.schema + '.' or ''
with self.engine.begin() as conn:
prefix = self.schema and self.schema + '.' or ''
conn.execute(text('drop table if exists %s%s' % (prefix, tableName)))
def resetSequence(self, tableName, colName, v):
@ -63,15 +63,19 @@ class Storage(object):
conn.execute(text(sq))
class StorageFactory(object):
class StorageFactory:
def sessionFactory(self):
return self.engine.connect
@staticmethod
def getEngine(dbtype, dbname, user, pw, host='localhost', port=5432, **kw):
def getEngine(dbtype, dbname, user, pw, **kw):
return create_engine('%s:///%s' % (dbtype, dbname), **kw)
def engineFromConfig(self, config):
return self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword)
@staticmethod
def mark_changed(session):
pass
@ -86,8 +90,7 @@ class StorageFactory(object):
storageClass = Storage
def __init__(self, config, storageClass=None):
self.engine = self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword)
self.engine = self.engineFromConfig(config)
self.Session = self.sessionFactory()
if storageClass is not None:
self.storageClass = storageClass

View file

@ -24,6 +24,12 @@ class StorageFactory(StorageFactory):
return create_engine('%s://%s:%s@%s:%s/%s' % (
dbtype, user, pw, host, port, dbname), **kw)
def engineFromConfig(self, config):
return self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword,
host=getattr(config, 'dbhost', 'localhost'),
port=getattr(config, 'dbport', 5432))
@staticmethod
def mark_changed(session):
return mark_changed(session)

View file

@ -7,6 +7,18 @@ from scopes.storage.common import registerContainerClass
from scopes.storage.tracking import Container, Track
class DummyFolder(dict):
prefix = 'dummy'
def asDict(self):
return self
def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__,
super(DummyFolder, self).__repr__())
@implementer(IContainer, IReference)
class Folder(Track):
@ -57,6 +69,8 @@ class Root(Folder):
"""A dummy (virtual) root folder for creating real folders
using the Folder API."""
prefix = 'root'
def __init__(self, storage):
cont = storage.create(Folders)
super(Root, self).__init__(container=cont)

21
scopes/storage/message.py Normal file
View file

@ -0,0 +1,21 @@
# scopes.storage.message
"""Generic messages (or events) to be stored in SQL database."""
from scopes.storage.common import registerContainerClass
from scopes.storage.tracking import Container, Track
class Message(Track):
headFields = ['domain', 'action', 'class', 'item']
prefix = 'msg'
@registerContainerClass
class Messages(Container):
itemFactory = Message
indexes = [('domain', 'action', 'class', 'item'), ('domain', 'class', 'item')]
tableName = 'messages'
insertOnChange = True

View file

@ -20,7 +20,8 @@ class Track(object):
headFields = ['taskId', 'userName']
prefix = 'rec'
def __init__(self, *keys, **kw):
def __init__(self, *keys, data=None, timeStamp=None, trackId=None,
container=None, **kw):
self.head = {}
for k, v in kw.items():
if k in self.headFields:
@ -31,10 +32,10 @@ class Track(object):
if self.head.get(k) is None:
self.head[k] = ''
setattr(self, k, self.head[k])
self.data = kw.get('data') or {}
self.timeStamp = kw.get('timeStamp')
self.trackId = kw.get('trackId')
self.container = kw.get('container')
self.data = data or {}
self.timeStamp = timeStamp
self.trackId = trackId
self.container = container
def set(self, attr, value):
if attr in self.headFields:
@ -66,7 +67,7 @@ class Track(object):
return str(self.trackId)
def __repr__(self):
return '%s: %s' % (self.__class__.__name__, self.asDict())
return '<%s: %s>' % (self.__class__.__name__, self.asDict())
def asDict(self):
return dict(uid=self.uid, head=self.head, data=self.data,

View file

@ -1,33 +0,0 @@
# scopes/tests.py
"""The real test implementations"""
import unittest
from scopes import tlib_storage
import config
config.dbengine = 'postgresql'
config.dbname = 'ccotest'
config.dbuser = 'ccotest'
config.dbpassword = 'cco'
config.dbschema = 'testing'
# PostgreSQL-specific settings
from scopes.storage.db.postgres import StorageFactory
config.storageFactory = StorageFactory(config)
#storage = factory(schema='testing')
class Test(unittest.TestCase):
def test_001_tracking(self):
tlib_storage.test_tracking(self, config)
def test_002_folder(self):
tlib_storage.test_folder(self, config)
def test_003_type(self):
tlib_storage.test_type(self, config)
def test_004_topic(self):
tlib_storage.test_topic(self, config)

1
scopes/tests/__init__.py Normal file
View file

@ -0,0 +1 @@
# py-scopes/tests

52
scopes/tests/config.py Normal file
View file

@ -0,0 +1,52 @@
# py-scopes/tests/config.py
import logging
from os import getenv
import sys
#from scopes.web.app import demo_app, zope_app
log_file = 'scopes/tests/log/scopes-test.log'
log_level = logging.INFO
log_format = '%(asctime)s %(levelname)s %(name)s %(message)s'
log_dateformat = '%Y-%m-%dT%H:%M:%S'
# server / app settings
server_port = '8999'
base_url = 'testing:'
#app = zope_app
# storage settings
# SQLite
dbengine = 'sqlite'
dbname = 'var/test.db'
dbuser = None
dbpassword = None
dbschema = None
# special testing stuff
from scopes.tests import data_auth # add oidc URIs and keys to dummy_requests data
from scopes.tests import dummy_requests
sys.modules['requests'] = dummy_requests
# authentication settings
oidc_provider = 'test://oidc'
oidc_client_id = getenv('OIDC_CLIENT_ID', '12345')
oidc_params = dict(
op_config_url=oidc_provider + '/.well-known/openid-configuration',
op_uris=None,
op_keys=None,
op_project_scope='urn:zitadel:iam:org:project:id:zitadel:aud',
callback_url=getenv('OIDC_CALLBACK_URL', base_url + '/auth/callback'),
client_id=oidc_client_id,
principal_prefix=getenv('OIDC_PRINCIPAL_PREFIX', 'loops.'),
cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id),
cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None),
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None),
private_key_file=getenv('OIDC_SERVICE_USER_PRIVATE_KEY_FILE',
'scopes/tests/test-private-key.json'),
organization_id=getenv('OIDC_ORGANIZATION_ID', '12346'),
project_id=getenv('OIDC_PROJECT_ID', None),
)

37
scopes/tests/data_auth.py Normal file
View file

@ -0,0 +1,37 @@
# scopes.tests.data_auth
"""provide response data for testing (via dummy_requests)"""
from cryptography.hazmat.primitives.asymmetric import rsa
from scopes import util
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
public_key_n = util.b64e(public_key.public_numbers().n.to_bytes(256)).decode('ASCII')
oidc_data = {
'test://oidc/.well-known/openid-configuration': {
"issuer": "test://oidc",
"authorization_endpoint": "test://oidc/oauth/v2/authorize",
"token_endpoint": "test://oidc/oauth/v2/token",
"introspection_endpoint": "test://oidc/oauth/v2/introspect",
"userinfo_endpoint": "test://oidc/oidc/v1/userinfo",
"revocation_endpoint": "test://oidc/oauth/v2/revoke",
"end_session_endpoint": "test://oidc/oidc/v1/end_session",
"device_authorization_endpoint": "test://oidc/oauth/v2/device_authorization",
"jwks_uri": "test://oidc/oauth/v2/keys"},
'test://oidc/oauth/v2/keys': { "keys": [
{"use": "sig",
"kty": "RSA",
"kid": "316766976250797901",
"alg": "RS256",
"n": public_key_n,
"e": "AQAB"}]},
'test://oidc/oauth/v2/token': {
"access_token": "abcde12345"},
'test://oidc/v2/users/human': {
"code": 1}
}
from scopes.tests.dummy_requests import response_data
response_data.update(oidc_data)

View file

@ -0,0 +1,30 @@
# scopes.tests.requests
"""Dummy requests implementation for testing."""
from logging import getLogger
logger = getLogger('tests.dummy_requests')
def get(url, *args, **kw):
logger.info(f'get: %s - %s - %s', url, args, kw)
return FakeResponse(response_data[url])
def post(url, *args, **kw):
logger.info(f'post: %s - %s - %s', url, args, kw)
return FakeResponse(response_data[url])
class FakeResponse:
def __init__(self, data):
self.data = data
@property
def status_code(self):
return 200
def json(self):
return self.data
response_data = {}

View file

@ -0,0 +1 @@
directory for logfiles created by tests

36
scopes/tests/scratch.py Normal file
View file

@ -0,0 +1,36 @@
# scopes.tests.scratch
"""for keeping experimental and demonstration code and data (mainly for testing)"""
import base64
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.utils import int_to_bytes
from scopes import util
# generate (and serialize) or load private key
# generate public JWKS (key set) from private ky
#private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
#pem = private_key.private_bytes(encoding=serialization.Encoding.PEM,
# format=serialization.PrivateFormat.PKCS8,
# encryption_algorithm=serialization.NoEncryption())
pem = b'-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQD1ipW1hnV+OWF/\nE09YEQB9GTwQ0DUhmM1ZtmONV3xp1zrpLsvQtJJ6lZ8rp+ws35HGEEQWuBvgKP+K\nyyhILkgHp5LUtXsIruvo/+20Gbxr3bbIp2Omv8NuUnQIlb0SkZwauz/iLi+dFnoW\nTzkCwRnOPG+xqrSzD4CUCIZc4fU61h457I5PWTgK+bZXlsPytook9HpCIPX+9PaU\n935KHeWLonx3GZWqcqUyHSUn8dgRMypaKO70zCZkZWwDbCkJv7OUuY9QijN9niA+\nX3MFyqf9+WK7/lLnNYimdXk9Pz9Lim7HhoAApoW/0ueUfok54OIzuemTAUaunaN6\nmdOvh54dAgMBAAECggEAAWW5Vx7kAY1Bto104U51EEm519EvhUm6B5T4+VTp5LYK\nF1rEMIFwtBkoFfjzLJFEwEakJ8Hgq80TIRdyJ72r9ACIPGtTlH0Jjg94IqyvwOy6\n23fPEXkrLHR0Z5g8bR+6A95liQrA+tCD1QVFRAWUktlkVXfxRC95TMCA3i7rakGG\no6BbyUD2Hp7IEO5fbfKNiqrUO/LwvLOMBTe3Q9EVv7fblm21HiwWG26RgO29UtFo\nEKcaALSqIYwYCkc9EuLpcGRzLfH1opid3Xtu4+hm5enNgdXZRKr9lSQUCSi9oZRE\npszNGyDsaot2ixwKP3c6Eo3bQKMmXXw8Y3JsNiZzIQKBgQD8QuCjKyAeH0QrhW1Y\nvSmUC3q517WZPRgsEMx917MYn/Gl1ywsROjawP/ClxTZkpXVgALFvrggjB75p/QG\noFamZpf9U95fVivAkANQYmhGmqPVQUeb2vgHQOdPSdvvMoRumcEcfS/dOo9R//s+\nfrBQNYfp/tTttdyM2N0CzsuBzQKBgQD5LjX8SvLPe6q6xJchdExywbTk+DLtiKDs\nT5avaGhOqzYhir6FvEt7Gw74J6IWzIt5X4OteSkpRvj2TN0ik+KAfZOk+v9C5iK7\nxbicWj5XSaPJcYvhxogO7mb8sSrl/cY20HRkCsyQOKeH005PHsGZztZoHrsF6t2X\nbtKgNWp9kQKBgHLDHh09RlxNzx6Zkfh3/k1qt4eKmgQ/5hpN/ioWElVWloHjFSaC\npwi2GuT1BLhC1sWNejVqIaw08vaTMRI+qY0ESYsnN5hZxIfTPJ66VkQgn/4pt6Ex\nCfuKzHCm4la8vcDvVApY7YiQ1pjwguWYjy++WrnahBYs0UyGcG2RlMXVAoGBAJNJ\nf1ubqZ5+yNIQ9gwuRCno2dYl52SESCqmeLlCC7XEegCllCxUuoEP429HbgXv7dlW\nXe0iGvRtISflEyknJNEyaR0xx8RxZ8J6Ar9YkFTkEE44MajIww+gV3ux9Vtw/8LS\nwJmJ0JTHCC++9SDLW0BhBFcTIxVCWKz0MsfECyghAoGAOLm2dF59JiiiynGiWXzJ\nZcbZQStR+ysjLX+RqTATAorhweUiv+HwbB4boWhOnDD7q3t+93GnzeqI9m9Q5yo6\nRTyyNYa4uJXMMarZxi3m+RGvuCC2h+7ja6H6UGg3xrHNwcykJkuRAZvqCN+f6Nxo\njc8o9LMimQyTv+yptrOhi2w=\n-----END PRIVATE KEY-----\n'
private_key = serialization.load_pem_private_key(pem, password=None)
public_key = private_key.public_key()
nums = public_key.public_numbers()
#e = base64.urlsafe_b64encode(int_to_bytes(nums.e)).rstrip(b'=')
#e = base64.urlsafe_b64encode(nums.e.to_bytes(3)).rstrip(b'=')
e = b'AQAB'
#n = base64.urlsafe_b64encode(int_to_bytes(nums.n)).rstrip(b'=')
n = util.b64e(nums.n.to_bytes(256))
# data from example session with zitadel
keys_response = {"keys":[{"use":"sig","kty":"RSA","kid":"317781576895225677","alg":"RS256","n":"rBE2kf4S1-uLqTKoRcxmBDZXeNU0PcDZtXX3pYK4hbE_cVNqMnWZQNXh4asLvZ_VpInIRI_gRe5pFfrUfnFppnu7sgUGPHS_i6E1JSDWDDViwU0DbQXzTMA36xb8fI9vm_StrKqQ2snpkV2YprWC597vOa6w8eWMEfwWV39gmpgjyaYfwKi2ldvnn_djYSEyy6Cs8QW9L8uaTKDYUuuIY7iyjTRI7kM1boZcg26WQ4nDgSU8BMEmPGc-h7BHIm3PVEKNBUU3dRIcvIlnAx7AJoqdmUU0BllMGOd-To7PP760Vx4gYBsTFRGaQ_9P7vRApn0Wywnhj0BaVWGyJPkggQ","e":"AQAB"}]}
token_response = {'access_token': 'WGn4xjOluOgEugwqfxACziBZxAxQ1SKBKU6p27DysWw9slM55Fn7wkz_avpy5BjFR1uU_2rJoQTMNsO8NyxOEkmhbpP9xURUQrLRBiZT', 'token_type': 'Bearer', 'expires_in': 43199, 'id_token': 'eyJhbGciOiJSUzI1NiIsImtpZCI6IjMxNzc4MTU3Njg5NTIyNTY3NyIsInR5cCI6IkpXVCJ9.eyJhbXIiOlsicHdkIl0sImF0X2hhc2giOiJQbC1kMjBvRG9DRHZqTUtGMmF4S3N3IiwiYXVkIjpbIjMxMTYxMzExOTgxNjM5MjUyNSIsIjMxNDQ2OTIwOTA3OTkzMDcwMSIsIjMxMTQ3MjQ1MTc2ODg2ODY4NSJdLCJhdXRoX3RpbWUiOjE3NDU5MTAxNTksImF6cCI6IjMxMTYxMzExOTgxNjM5MjUyNSIsImNsaWVudF9pZCI6IjMxMTYxMzExOTgxNjM5MjUyNSIsImVtYWlsIjoiaGVsbXV0Lm1lcnpAcG9zdGVvLmRlIiwiZW1haWxfdmVyaWZpZWQiOnRydWUsImV4cCI6MTc0NTk1MzUwOSwiZmFtaWx5X25hbWUiOiJVc2VyIiwiZ2VuZGVyIjoibWFsZSIsImdpdmVuX25hbWUiOiJUZXN0IiwiaWF0IjoxNzQ1OTEwMzA5LCJpc3MiOiJodHRwczovL2ExLmN5Ny5kZSIsImxvY2FsZSI6ImRlIiwibmFtZSI6IlRlc3QgVXNlciIsIm5vbmNlIjoiMVJGem9RNDkyazNWTFEzcyIsInByZWZlcnJlZF91c2VybmFtZSI6InRzdDkiLCJzaWQiOiJWMV8zMTc3ODQyMjY2MjE2MTE4NTMiLCJzdWIiOiIzMTE4NDY3OTY3Mzk1MzQ2NjkiLCJ1cGRhdGVkX2F0IjoxNzQ1MzA1MTkzLCJ1cm46eml0YWRlbDppYW06b3JnOnByb2plY3Q6MzExNDcyNDUxNzY4ODY4Njg1OnJvbGVzIjp7ImNjLXVzZXIiOnsiMzExNDczNTAyMjc0MjQ4NTI1IjoiY3liZXJjb25jZXB0cy5hMS5jeTcuZGUifX0sInVybjp6aXRhZGVsOmlhbTpvcmc6cHJvamVjdDpyb2xlcyI6eyJjYy11c2VyIjp7IjMxMTQ3MzUwMjI3NDI0ODUyNSI6ImN5YmVyY29uY2VwdHMuYTEuY3k3LmRlIn19LCJ1cm46eml0YWRlbDppYW06dXNlcjpyZXNvdXJjZW93bmVyOmlkIjoiMzExNDczNTAyMjc0MjQ4NTI1IiwidXJuOnppdGFkZWw6aWFtOnVzZXI6cmVzb3VyY2Vvd25lcjpuYW1lIjoiY3liZXJjb25jZXB0cyIsInVybjp6aXRhZGVsOmlhbTp1c2VyOnJlc291cmNlb3duZXI6cHJpbWFyeV9kb21haW4iOiJjeWJlcmNvbmNlcHRzLmExLmN5Ny5kZSJ9.hx8JWXWVPq-WfBx9AMgFRWaEX3CioU8BCZwT3d8YETMejsR33LDsbPfnzO0Dbw885tiyNIsdNRj7lfk5HtuZVWdHx58hnaMw-czGhIfB0GXvuh2UFemBoLNJLajtRvCBi-YIZ_c6TD0Ryd2wn_yQlgY1J2R8xTsBWeKM_SYqlKU20r4r3nsX4krg6Q9x-Kc3k0Rg3lMXoxFYgQSzXrKjxTI3HHb03H4lzgmrdcbQqfbLoHKdvGxzfo-Gf8Rlt9rQ-fpGu7fRfPJQV14SPJ6CQtKP0adq_D2blcp1I1xKXML8TytJa05RjL_l_KH-segFfORtGMc4mYo8JUpMW0KOvg'}
token_id_claims = {'amr': ['pwd'], 'at_hash': 'Pl-d20oDoCDvjMKF2axKsw', 'aud': ['311613119816392525', '314469209079930701', '311472451768868685'], 'auth_time': 1745910159, 'azp': '311613119816392525', 'client_id': '311613119816392525', 'email': 'helmut.merz@posteo.de', 'email_verified': True, 'exp': 1745953509, 'family_name': 'User', 'gender': 'male', 'given_name': 'Test', 'iat': 1745910309, 'iss': 'https://a1.cy7.de', 'locale': 'de', 'name': 'Test User', 'nonce': '1RFzoQ492k3VLQ3s', 'preferred_username': 'tst9', 'sid': 'V1_317784226621611853', 'sub': '311846796739534669', 'updated_at': 1745305193, 'urn:zitadel:iam:org:project:311472451768868685:roles': {'cc-user': {'311473502274248525': 'cyberconcepts.a1.cy7.de'}}, 'urn:zitadel:iam:org:project:roles': {'cc-user': {'311473502274248525': 'cyberconcepts.a1.cy7.de'}}, 'urn:zitadel:iam:user:resourceowner:id': '311473502274248525', 'urn:zitadel:iam:user:resourceowner:name': 'cyberconcepts', 'urn:zitadel:iam:user:resourceowner:primary_domain': 'cyberconcepts.a1.cy7.de'}

View file

@ -0,0 +1 @@
{"type":"serviceaccount","keyId":"314794985486606157","key":"-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEA0dC8wcwu6Uefxx/shqsSTk//ATseeCy28RMAEa4NFGj/y8Ju\nOfVUj7pB5+6onjmsBAHXhCJ+fEWWAJdHnbvywrBNNhVx38f8v+90zUP2IzlT1UDp\ncTIYaehnf3+uqwgMcijnYJ6UgaHFMSecxnYD4adnw8J/FEMDgy2N+v5krp989VQ5\nT2kgrkb/l5z8dgLhmmcLKm7YCG1uXXP+g+qzEZ9Uhur5b+czjIalzC/tq2V2JoJB\nooH9w1iaRXRKel7FZPo0YGyQh/0a9Zn5JsXVc3YTHTKh9madr/yQqmk+6siTl/Ou\ntz9mvpY+AfFRaIWikoyB3W9rHd0b6WtQPflEPwIDAQABAoIBAAN64daZC2IlJPpJ\nhkPJjJkt7H3ZvCykGTiwZvzkFSV0hGGdzPQ7JHbp0PQG2lcdf8PlP+zaIZzwDofd\n+nscRe+CuxUdj/D1QTTxxM8uxGNbLQ/JbtXIzezbxPOxa3U8wfAWy5enqbDovPuO\nu6PzCydv/mGZ1T/ByMohNEyocYUP6mupHWwf2hN/lnrL264w8uvNjAw0xDtbtBJN\nX61u6vi/fiY37qKblN3irAePwK4LIhHZZoyJ1HrFYIkFf0Bviuzpw/ASVqbjizPV\nmTxGxghiQacAMvSSe+pcfJ7ip74rCFv7+6pzL+yW8df1lbSM9vS+86SDgY9RCc2E\n3h1/hUECgYEA/WqiWNXey25qCNB6WHo3SU5cZIZVNWzsT1zkwkXOUtEyU0/zEfT+\nEjW/vbxIBgZNV1tX2aXd7Ke5OCoQ1dqLnmDoO5d13xTeaWN3FR8ibTwbaDCwyg5d\njyIXK2k7IwtcpJFgJFGM/6udAdO/bPm1IPEslJXHBqZoGrKb+bTw6N8CgYEA0/RQ\nHtQluQYBtXNzEql0MaxBUxfHkwjL6Yo6dM+EJAomI+cccVy22s+z2aQX5GVQnbzs\nm9BGkJzzn7eGPy3i2LgStqUZ2W7VqfIJNCIDbC7OxBAaszh5/LEgv5pfp1Yr/HIf\nwHZz53rdV8H+oUfMJdlyrRyGOeGIDZCd94nTMKECgYAQOpT9BW1IL+EAgYFkSydh\nPXBzS5sHWdtkVbmcq2XELfuAFF2np73hoqmN2BHwuNSZJJNir9mffzpAW4lKeL16\nPhCBSHjW+Xoo26LTqnPE9RV4Pa4EspjRQsijEhEkdGTRcTHsAYD7Gp1qcYoPy4oK\n+wb02Qau6Vc/ZnLQsgK/lwKBgQDMLSGxUPQ11E95GAnWBF7mKuWSwemC/opQItRF\nClJk1VIAa/W+Tm3nQwYhti0920tZaFEVmAEh9c/KH+S2n+FSm5+LSmgoSNiSqZGs\nIsfhQwXzYQAXfWQlxAukB3X1oNEmkll78Z+dcYIfs8UyYBOMsngBwuSahWOmjZVe\ni+phgQKBgC0ozpbIcNg48M4/Rrev3qJB7XlU74MySsFJdBhlrzmK3+z02bXWbyaJ\nzQLwC6Dorw0PcWAKtcJcbBn6ZAoptcmG6wdQrYk1IC+82TDcNvAFL06y8OXHYLtu\ni5AiE4nK1waoDF/1I66VACyKI6hhISRW3bKaxHhrx5OsGKVurF4R\n-----END RSA PRIVATE KEY-----\n","expirationDate":"9999-12-31T23:59:59Z","userId":"311889729668833101"}

View file

@ -0,0 +1,47 @@
# scopes.tests.test_postgres
"""Tests for the 'scopes.storage' package - using PostgreSQL."""
import os, sys
sys.path = [os.path.dirname(__file__)] + sys.path
import unittest
from scopes.tests import tlib_storage
from scopes.tests.util import setup_logging
from scopes.storage.db.postgres import StorageFactory
import config
setup_logging(config)
config.dbengine = 'postgresql+psycopg'
config.dbname = 'testdb'
config.dbuser = 'testuser'
config.dbpassword = 'secret'
config.dbschema = 'testing'
config.storageFactory = StorageFactory(config)
class Test(unittest.TestCase):
def test_001_tracking(self):
tlib_storage.test_tracking(self, config)
def test_002_folder(self):
tlib_storage.test_folder(self, config)
def test_003_type(self):
tlib_storage.test_type(self, config)
def test_004_topic(self):
tlib_storage.test_topic(self, config)
def test_005_message(self):
tlib_storage.test_message(self, config)
def test_suite():
return unittest.TestSuite((
unittest.TestLoader().loadTestsFromTestCase(Test),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')

View file

@ -0,0 +1,48 @@
# scopes.tests.test_standard
"""Tests for the 'scopes.storage' package."""
import os, sys
sys.path = [os.path.dirname(__file__)] + sys.path
import unittest
from scopes.tests import tlib_web, tlib_storage
from scopes.tests.util import setup_logging
from scopes.storage.common import StorageFactory
import config
setup_logging(config)
config.dbengine = 'sqlite'
config.dbname = 'var/test.db'
config.dbschema = None
config.storageFactory = StorageFactory(config)
class Test(unittest.TestCase):
def test_001_tracking(self):
tlib_storage.test_tracking(self, config)
def test_002_folder(self):
tlib_storage.test_folder(self, config)
def test_003_type(self):
tlib_storage.test_type(self, config)
def test_004_topic(self):
tlib_storage.test_topic(self, config)
def test_013_web(self):
tlib_web.test_app(self, config)
tlib_web.test_auth(self, config)
tlib_web.test_user_data(self, config)
def suite():
return unittest.TestSuite((
unittest.TestLoader().loadTestsFromTestCase(Test),
))
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View file

@ -1,9 +1,9 @@
# scopes/tlib_storage.py
# tests/tlib_storage.py
"""Test implementation for the `scopes.storage` package."""
from datetime import datetime
from scopes.storage import concept, folder, topic, tracking
from scopes.storage import concept, folder, message, topic, tracking
def test_tracking(self, config):
@ -82,7 +82,7 @@ def test_type(self, config):
concept.setupCoreTypes(storage)
types = storage.getContainer(concept.Type)
tps = list(types.query())
self.assertEqual(len(tps), 6)
self.assertEqual(len(tps), 7)
tfolder = types.queryLast(name='folder')
fldrs = list(tfolder.values())
@ -94,6 +94,8 @@ def test_type(self, config):
def test_topic(self, config):
storage = config.storageFactory(config.dbschema)
storage.dropTable('rels')
rels = storage.getContainer(concept.Triple)
storage.dropTable('topics')
topics = storage.getContainer(topic.Topic)
types = storage.getContainer(concept.Type)
@ -112,6 +114,7 @@ def test_topic(self, config):
title='Programming Languages',
description='Programming Languages'))
topics.save(tp_proglang)
#storage.commit() # avoid "database locked" error with sqlite
tp_itc.addChild(tp_proglang)
c = list(tp_itc.children())
@ -119,3 +122,15 @@ def test_topic(self, config):
storage.commit()
def test_message(self, config):
storage = config.storageFactory(config.dbschema)
storage.dropTable('messages')
messages = storage.create(message.Messages)
m01 = message.Message('system', 'data', 'session', 'V1_317784226621611853')
m01.update(dict(userid='tst9'))
mid01 = messages.save(m01)
storage.commit()

49
scopes/tests/tlib_web.py Normal file
View file

@ -0,0 +1,49 @@
# tests/tlib_web.py
"""Test implementation for the `scopes.web` package."""
import json
import logging
from zope.publisher.browser import TestRequest
from zope.publisher.publish import publish
from scopes.web.app import Publication
from scopes.storage.folder import Root
logger = logging.getLogger('tlib_web')
def publishRequest(config, storage, path):
appRoot = Root(storage)
request = TestRequest(environ=dict(PATH_INFO=path))
request.setPublication(Publication(appRoot))
request = publish(request, False)
return request.response
def test_app(self, config):
storage = config.storageFactory(config.dbschema)
response = publishRequest(config, storage, '/top')
logger.info('test_app: response %s %s', response.getStatus(), response.getHeaders())
result = json.loads(response.consumeBody())
self.assertEqual(result['items'][0]['head']['name'], 'level2-item1')
def test_auth(self, config):
from scopes.web.auth import oidc
oidc.startup() # todo: use generic app.startServices()
self.assertEqual(len(config.oidc_params['op_uris']), 8)
storage = config.storageFactory(config.dbschema)
response = publishRequest(config, storage, '/top/auth/login')
headers = dict(response.getHeaders())
logger.info('test_auth: response %s %s', response.getStatus(), headers)
self.assertEqual(response.getStatus(), 302)
uri = config.oidc_params['op_uris']['jwks_uri']
keys = oidc.loadOidcKeys(uri)
logger.info('test_auth keys: %s', keys)
def test_user_data(self, config):
from scopes.org import user
u1 = user.User(login='tst9', email='tst9@example.com',
firstName='Test', lastName='User')
xu1 = user.ExtUser(u1, idPrefix='test.')
xu1.create(updateIfExists=True)

11
scopes/tests/util.py Normal file
View file

@ -0,0 +1,11 @@
# scopes.tests.util
import logging
def setup_logging(config):
hdlr = logging.getLogger().handlers[-1]
logging.getLogger().removeHandler(hdlr) # remove NullHandler added by testrunner
logging.basicConfig(filename=config.log_file, level=config.log_level,
format=config.log_format, datefmt=config.log_dateformat)

26
scopes/util.py Normal file
View file

@ -0,0 +1,26 @@
# scopes.util
import base64
import hashlib
from secrets import choice
import string
# random strings, hashes, encodings
# for authentication, encryption, and other purposes
BASE = string.ascii_letters + string.digits
BASE2 = BASE + '-._~'
def rndstr(size=16):
return ''.join([choice(BASE) for _ in range(size)])
def rndstr2(size=64):
return ''.join([choice(BASE2) for _ in range(size)])
def b64e(b):
return base64.urlsafe_b64encode(b).rstrip(b'=')
def hashS256(s):
h = hashlib.sha256(s.encode('ascii')).digest()
return b64e(h).decode('ascii')

1
scopes/web/__init__.py Normal file
View file

@ -0,0 +1 @@
"""package scopes.web"""

View file

@ -1,22 +1,31 @@
# scopes.server.app
# scopes.web.app
import logging
from zope.i18n.interfaces import IUserPreferredCharsets
from zope.interface import implementer
from zope.publisher.base import DefaultPublication
from zope.publisher.browser import BrowserRequest
from zope.publisher.interfaces import NotFound
from zope.publisher.publish import publish
from scopes.interfaces import ITraversable, IView
from scopes.server.browser import getView
from scopes.web.browser import getView
import scopes.storage.concept # register container classes
from scopes.storage.folder import Root
@implementer(IUserPreferredCharsets)
class Request(BrowserRequest):
def getPreferredCharsets(self):
return ['UTF-8']
def zope_app_factory(config):
storageFactory = config.StorageFactory(config)
def zope_app(environ, start_response):
storage = storageFactory(config.dbschema)
appRoot = Root(storage)
request = BrowserRequest(environ['wsgi.input'], environ)
request = Request(environ['wsgi.input'], environ)
request.setPublication(Publication(appRoot))
request = publish(request, True)
response = request.response
@ -27,6 +36,12 @@ def zope_app_factory(config):
class Publication(DefaultPublication):
def beforeTraversal(self, request):
super(Publication, self).beforeTraversal(request)
from scopes.web.auth.oidc import authentication
prc = authentication.authenticate(request)
request.setPrincipal(prc)
def traverseName(self, request, ob, name):
next = getView(request, ob, name)
if next is not None:

View file

@ -0,0 +1 @@
"""package scopes.web.auth"""

304
scopes/web/auth/oidc.py Normal file
View file

@ -0,0 +1,304 @@
# scopes.web.auth.uidc
from cryptography.fernet import Fernet
from datetime import datetime, timedelta, timezone
from email.utils import formatdate
import json
import jwt
import logging
import requests
from time import time
from urllib.parse import urlencode
from zope.authentication.interfaces import IAuthentication, IPrincipal
from zope.interface import Attribute, Interface, implementer
from zope.publisher.interfaces import Unauthorized
from zope.security.interfaces import IGroupAwarePrincipal
from scopes.web.browser import DefaultView, register
from scopes.storage.folder import DummyFolder, Root
from scopes import util
import config
logger = logging.getLogger('web.auth.oidc')
# OIDC authentication for browser users (principals)
@implementer(IAuthentication)
class OidcAuthentication:
def __init__(self, baseAuth):
self.baseAuth = baseAuth
def authenticate(self, request):
auth = Authenticator(request)
prc = auth.authenticate()
if prc is None and self.baseAuth is not None:
prc = self.baseAuth.authenticate(request)
if prc is None:
prc = self.unauthenticatedPrincipal()
return prc
def getPrincipal(self, id):
if self.baseAuth is not None:
return self.baseAuth.getPrincipal(id)
def getPrincipals(self, s):
if self.baseAuth is not None:
return self.baseAuth.getPrincipals(s)
def unauthenticatedPrincipal(self):
if self.baseAuth is not None:
return self.baseAuth.unauthenticatedPrincipal()
def unauthorized(self, id, request):
if self.baseAuth is not None:
return self.baseAuth.unauthorized(id, request)
Authenticator(request).login()
def logout(self, request):
Authenticator(request).logout()
authentication = OidcAuthentication(None)
class IExternalPrincipal(Interface):
extUserLink = Attribute('Link to OIDC provider for viewing/editing external user')
@implementer(IGroupAwarePrincipal, IExternalPrincipal)
class Principal:
def __init__(self, id, data):
self.id = id
self.data = data
@property
def title(self):
return self.data['name']
@property
def groups(self):
return self.data.get('groups', [])
@property
def extUserLink(self):
return config.oidc_provider + '/ui/console/users/me'
def asDict(self):
data = self.data.copy()
data['id'] = self.id
return data
class Authenticator(DummyFolder):
prefix = 'auth.oidc'
group_prefix = 'gloops.'
def __init__(self, request):
self.request = request
self.params = config.oidc_params
self.setCrypt(self.params.get('cookie_crypt'))
def setCrypt(self, key):
self.cookieCrypt = key and Fernet(key) or None
def authenticate(self):
''' return principal or None'''
data = self.loadSession()
logger.debug('authenticate: %s', data)
if data and 'userid' in data:
id = data.pop('userid')
return Principal(id, data)
return None
def login(self):
state = util.rndstr()
nonce = util.rndstr()
codeVerifier = util.rndstr2()
codeChallenge = util.hashS256(codeVerifier)
reqUrl = self.request.form.get('camefrom') or config.base_url
args = dict(
client_id=self.params['client_id'],
response_type='code', # 'code id_token token',
state=state, nonce=nonce,
code_challenge=codeChallenge, code_challenge_method='S256',
scope='openid profile email urn:zitadel:iam:user:resourceowner',
redirect_uri=self.params['callback_url'],
)
self.storeSession(dict(state=state, nonce=nonce, request_uri=reqUrl,
code_verifier=codeVerifier))
authUrl = self.params['op_uris']['authorization_endpoint']
loginUrl = '?'.join((authUrl, urlencode(args)))
logger.debug('login: URL %s', loginUrl)
self.request.response.redirect(loginUrl, trusted=True)
def callback(self, groupsProvider=None):
req = self.request
logger.debug('callback: %s %s', self, req.form)
sdata = self.loadSession()
reqUrl = sdata.get('request_uri') or config.base_url
code = req.form['code']
# !check state: req.form['state'] == sdata['state']
args = dict(
grant_type='authorization_code',
code=code,
redirect_uri=self.params['callback_url'],
client_id=self.params['client_id'],
code_verifier=sdata['code_verifier']
)
# !set header: 'Content-Type: application/x-www-form-urlencoded'
tokenUrl = self.params['op_uris']['token_endpoint']
tokenResponse = requests.post(tokenUrl, data=args)
tdata = tokenResponse.json()
userData = self.getIdTokenData(tdata['id_token'])
userId = userData['sub']
if not '.' in userId:
userId = (self.params.get('principal_prefix', '') +
userData['preferred_username'])
groups = userData.get('urn:zitadel:iam:org:project:roles', {})
groups = set(self.group_prefix + g for g in groups)
if groupsProvider is not None:
groups = groups.union(groupsProvider(userId))
ndata = dict(
userid=userId,
name=userData['name'],
email=userData['email'],
groups=list(groups),
access_token=tdata['access_token'],
session_id=userData['sid'],
)
self.storeSession(ndata)
logger.debug('callback: session data: %s', ndata)
req.response.redirect(reqUrl, trusted=True)
def logout(self):
logoutUrl = self.params['op_uris']['end_session_endpoint']
args = dict(
client_id=self.params['client_id'],
post_logout_redirect_uri=config.base_url,
)
logoutUrl = '?'.join((logoutUrl, urlencode(args)))
cname = self.params['cookie_name']
logger.debug('logout, cookie: %s, url: %s', cname, logoutUrl)
self.request.response.expireCookie(cname, path='/')
self.request.response.redirect(logoutUrl, trusted=True)
def storeSession(self, data):
lifetime = int(self.params['cookie_lifetime'])
options = dict(
path='/',
expires=formatdate(time() + lifetime, localtime=False, usegmt=True),
httponly=True,
)
options['max-age'] = lifetime
domain = self.params['cookie_domain']
if domain:
options['domain'] = domain
name = self.params['cookie_name']
value = json.dumps(data)
if self.cookieCrypt:
value = self.cookieCrypt.encrypt(value.encode('UTF-8')).decode('ASCII')
self.request.response.setCookie(name, value, **options)
def loadSession(self):
cookie = self.request.getCookies().get(self.params['cookie_name'])
if cookie is None:
return {}
if self.cookieCrypt:
cookie = self.cookieCrypt.decrypt(cookie)
# !error check: return None - or raise error?
data = json.loads(cookie)
return data
def getIdTokenData(self, token):
uri = self.params['op_uris']['jwks_uri']
keys = loadOidcKeys(uri)
header = jwt.get_unverified_header(token)
key = jwt.PyJWK(keys[header['kid']])
return jwt.decode(token, key, audience=self.params['client_id'])
@register('auth')
def authView(context, request):
return Authenticator(request)
@register('login', Authenticator)
def login(context, request):
context.login()
return DefaultView(context, request)
@register('callback', Authenticator)
def callback(context, request):
context.callback()
return DefaultView(context, request)
@register('logout', Authenticator)
def logout(context, request):
context.logout()
return DefaultView(context, request)
def startup():
loadOidcProviderData()
#app.Publication.registerBeforeTraversal(
# lambda req: req.setPrincipal(authentication.authenticate(req))
oidcProviderUris = ['authorization_endpoint', 'token_endpoint',
'introspection_endpoint', 'userinfo_endpoint',
'revocation_endpoint', 'end_session_endpoint',
'device_authorization_endpoint', 'jwks_uri']
def loadOidcProviderData(force=False):
params = config.oidc_params
if force or params.get('op_uris') is None:
uris = params['op_uris'] = {}
opData = requests.get(params['op_config_url']).json()
for key in oidcProviderUris:
uris[key] = opData[key]
#if force or params.get('op_keys') is None:
#params['op_keys'] = requests.get(uris['jwks_uri']).json()['keys']
def loadOidcKeys(uri):
return dict((item['kid'], item) for item in requests.get(uri).json()['keys'])
# service user authentication
def authenticateClient(paramsName='oidc_params'):
loadOidcProviderData()
params = getattr(config, paramsName)
keyData = loadPrivateKeyData(params['private_key_file'])
userId = keyData['userId']
keyId = keyData['keyId']
key = keyData['key']
now = datetime.now(timezone.utc)
token_lifetime=params.get('api_token_lifetime', 60)
payload = dict(
iss=userId, sub=userId, aud=config.oidc_provider,
iat=now, exp=now + timedelta(minutes=token_lifetime),
)
jwToken = jwt.encode(payload, key, algorithm="RS256",
headers=dict(alg='RS256', kid=keyId))
data = dict(
grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer',
scope=' '.join(('openid', params['op_project_scope'])),
assertion=jwToken,
)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
url = params['op_uris']['token_endpoint']
resp = requests.post(url, data=data, headers=headers)
if resp.status_code != 200:
#print(resp.text)
logger.error('authenticateClient: %s', resp.text)
return None
tdata = resp.json()
#print(tdata)
return tdata['access_token']
def loadPrivateKeyData(fn='.private-key.json'):
with open(fn) as f:
return json.load(f)

View file

@ -1,21 +1,25 @@
# scopes.server.browser
# scopes.web.browser
import json
import logging
from zope.interface import implementer
from scopes.interfaces import IContainer, IReference, IView
logger = logging.getLogger('web.browser')
views = {} # registry for all views: {name: {prefix: viewClass, ...}, ...}
def register(name, *contextClasses):
"""Use as decorator: `@register(name, class, ...).
class `None` means default view for all classes."""
def register(name, *contextTypes):
"""Use as decorator: `@register(name, class_or_prefix, ...).
No class (or `None` or `''`) means default view for all classes."""
def doRegister(factory):
implementer(IView)(factory)
nameEntry = views.setdefault(name, {})
for cl in contextClasses:
nameEntry[cl.prefix] = factory
else:
nameEntry[''] = factory
cts = contextTypes or ['']
for ct in cts:
if not isinstance(ct, str):
ct = ct.prefix
nameEntry[ct] = factory
return factory
return doRegister
@ -28,6 +32,7 @@ def getView(request, ob, name):
factory = nameEntry.get('')
if factory is None:
return None
logger.debug('getView: %s %s', ob, request['PATH_INFO'])
return factory(ob, request)
@ -54,10 +59,11 @@ class DefaultView:
result['target'] = target.asDict()
if IContainer.providedBy(target):
result['target']['items'] = [v.asDict() for v in target.values()]
prc = self.request.principal
if prc is not None:
result['principal'] = prc.asDict()
return result
def render(self, result):
self.request.response.setHeader('Content-type', 'application/json; charset=utf-8')
return json.dumps(result).encode('UTF-8')

43
scopes/web/client.py Normal file
View file

@ -0,0 +1,43 @@
# scopes.web.client
"""Web client functionality: access to web sites, APIs with authentication."""
import logging
import requests
from scopes.web.auth import oidc
logger = logging.getLogger('web.client')
class ApiClient:
def __init__(self, baseUrl, authToken=None):
self.baseUrl = baseUrl
self.authToken = authToken
def authentication(self):
if self.authToken == None:
self.authToken = oidc.authenticateClient()
return dict(Authorization=f'Bearer {self.authToken}')
def post(self, endpoint, data):
headers = self.authentication()
headers['Content-Type'] = 'application/json'
headers['Connect-Protocol-Version'] = '1'
# self.makeUrl(endpoint)
url = '/'.join((self.baseUrl, endpoint))
resp = requests.post(url, json=data, headers=headers)
if resp.status_code >= 400:
logger.error('post %s: %s %s', url, resp.status_code, resp.text)
return resp.status_code, resp.json()
def put(self, endpoint, objId, data):
headers = self.authentication()
headers['Content-Type'] = 'application/json'
# self.makeUrl(endpoint, objId)
url = '/'.join((self.baseUrl, endpoint, objId))
resp = requests.put(url, json=data, headers=headers)
if resp.status_code >= 400:
logger.error('post %s: %s %s', url, resp.status_code, resp.text)
return resp.status_code, resp.json()

View file

@ -1,47 +1,4 @@
from setuptools import setup, find_packages
import os
from setuptools import setup
version = '2.0'
setup()
long_description = (
open('README.md').read()
+ '\n' +
'Contributors\n'
'============\n'
+ '\n' +
open('CONTRIBUTORS.txt').read()
+ '\n' +
open('CHANGES.txt').read()
+ '\n')
setup(name='py-scopes',
version=version,
description="combined triple and event storage for the cco application platform",
long_description=long_description,
# Get more strings from
# http://pypi.python.org/pypi?%3Aaction=list_classifiers
classifiers=[
"Programming Language :: Python",
],
keywords='',
author='cyberconcepts.org team',
author_email='team@cyberconcepts.org',
url='http://www.cyberconcepts.org',
license='MIT',
packages=find_packages(),
#package_dir = {'': 'src'},
#namespace_packages=['cco'],
include_package_data=True,
zip_safe=False,
install_requires=[
'setuptools',
'transaction',
'psycopg2-binary',
'SQLAlchemy',
'zope.sqlalchemy',
# -*- Extra requirements: -*-
],
entry_points="""
# -*- Entry points: -*-
""",
)