auth: store user data in cookie, retrieve in authenticate()

This commit is contained in:
Helmut Merz 2025-04-05 12:31:26 +02:00
parent 7bca60e74c
commit 8d3ff5b667
3 changed files with 50 additions and 19 deletions

View file

@ -36,6 +36,12 @@ def zope_app_factory(config):
class Publication(DefaultPublication): class Publication(DefaultPublication):
def beforeTraversal(self, request):
super(Publication, self).beforeTraversal(request)
from scopes.server.auth import authentication
prc = authentication.authenticate(request)
request.setPrincipal(prc)
def traverseName(self, request, ob, name): def traverseName(self, request, ob, name):
next = getView(request, ob, name) next = getView(request, ob, name)
if next is not None: if next is not None:

View file

@ -6,7 +6,7 @@ import json
import requests import requests
from time import time from time import time
from urllib.parse import urlencode from urllib.parse import urlencode
from zope.authentication.interfaces import IAuthentication from zope.authentication.interfaces import IAuthentication, IPrincipal
from zope.interface import implementer from zope.interface import implementer
from zope.publisher.interfaces import Unauthorized from zope.publisher.interfaces import Unauthorized
@ -17,11 +17,6 @@ from scopes import util
import config import config
def authenticate(request):
#print('*** authenticate')
return None
@implementer(IAuthentication) @implementer(IAuthentication)
class OidcAuthentication: class OidcAuthentication:
@ -29,8 +24,8 @@ class OidcAuthentication:
self.baseAuth = baseAuth self.baseAuth = baseAuth
def authenticate(self, request): def authenticate(self, request):
prc = authenticate(request) auth = Authenticator(request)
# prc = Authenticator().authenticate(request) prc = auth.authenticate()
if prc is None and self.baseAuth is not None: if prc is None and self.baseAuth is not None:
prc = self.baseAuth.authenticate(request) prc = self.baseAuth.authenticate(request)
return prc return prc
@ -52,6 +47,21 @@ class OidcAuthentication:
JwtAuthentication = OidcAuthentication # old name - still used? JwtAuthentication = OidcAuthentication # old name - still used?
authentication = OidcAuthentication(None)
@implementer(IPrincipal)
class Principal:
def __init__(self, id, data):
self.id = id
self.data = data
def asDict(self):
data = self.data.copy()
data['id'] = self.id
return data
class Authenticator(DummyFolder): class Authenticator(DummyFolder):
@ -61,16 +71,21 @@ class Authenticator(DummyFolder):
self.request = request self.request = request
self.params = config.oidc_params self.params = config.oidc_params
self.reqUrl = config.base_url self.reqUrl = config.base_url
self.setCrypt(self.params['cookie_crypt']) self.setCrypt(self.params.get('cookie_crypt'))
def setReqUrl(self, base, path): def setReqUrl(self, base, path):
self.reqUrl = '/'.join((base, path)) self.reqUrl = '/'.join((base, path))
def setCrypt(self, key): def setCrypt(self, key):
self.cookieCrypt = key and Fernet(key.encode('ASCII')) or None self.cookieCrypt = key and Fernet(key) or None
def authenticate(request): def authenticate(self):
''' return user data or None ''' ''' return principal or None'''
data = self.loadSession()
print('*** authenticate', data)
if data and 'userid' in data:
id = data.pop('userid')
return Principal(id, data)
return None return None
def login(self): def login(self):
@ -115,10 +130,13 @@ class Authenticator(DummyFolder):
print('*** token response', tdata) print('*** token response', tdata)
headers = dict(Authorization='Bearer ' + tdata['access_token']) headers = dict(Authorization='Bearer ' + tdata['access_token'])
userInfo = requests.get(self.params['userinfo_url'], headers=headers) userInfo = requests.get(self.params['userinfo_url'], headers=headers)
print('***', userInfo.json()) userData = userInfo.json()
# get relevant data from userInfo print('*** user data', userData)
# set up session data for authenticate()
ndata = dict( ndata = dict(
userid=userData['preferred_username'],
name=userData['name'],
email=userData['email'],
access_token=tdata['access_token'],
) )
self.storeSession(ndata) self.storeSession(ndata)
req.response.redirect(self.reqUrl, trusted=True) req.response.redirect(self.reqUrl, trusted=True)
@ -127,7 +145,7 @@ class Authenticator(DummyFolder):
pass pass
def storeSession(self, data): def storeSession(self, data):
options = {} options = dict(path='/')
lifetime = int(self.params['cookie_lifetime']) lifetime = int(self.params['cookie_lifetime'])
options['expires'] = formatdate(time() + lifetime, localtime=False, usegmt=True) options['expires'] = formatdate(time() + lifetime, localtime=False, usegmt=True)
options['max-age'] = lifetime options['max-age'] = lifetime
@ -137,16 +155,20 @@ class Authenticator(DummyFolder):
#options['httponly'] = True #options['httponly'] = True
name = self.params['cookie_name'] name = self.params['cookie_name']
value = json.dumps(data) value = json.dumps(data)
print('*** storeSession', name, value, options)
if self.cookieCrypt: if self.cookieCrypt:
value = self.cookieCrypt.encrypt(value.encode('ASCII')).decode('ASCII') value = self.cookieCrypt.encrypt(value.encode('UTF-8')).decode('ASCII')
self.request.response.setCookie(name, value, **options) self.request.response.setCookie(name, value, **options)
def loadSession(self): def loadSession(self):
cookie = self.request.getCookies().get(self.params['cookie_name']) cookie = self.request.getCookies().get(self.params['cookie_name'])
if cookie is None: if cookie is None:
raise ValueError('Missing authentication cookie') return {}
#raise ValueError('Missing authentication cookie')
if self.cookieCrypt: if self.cookieCrypt:
cookie = self.cookieCrypt.decrypt(cookie).decode('ASCII') cookie = self.cookieCrypt.decrypt(cookie)
print('*** loadSession', self.params['cookie_name'], cookie)
# !error check: return None - or raise error?
data = json.loads(cookie) data = json.loads(cookie)
return data return data

View file

@ -55,6 +55,9 @@ class DefaultView:
result['target'] = target.asDict() result['target'] = target.asDict()
if IContainer.providedBy(target): if IContainer.providedBy(target):
result['target']['items'] = [v.asDict() for v in target.values()] result['target']['items'] = [v.asDict() for v in target.values()]
prc = self.request.principal
if prc is not None:
result['principal'] = prc.asDict()
return result return result
def render(self, result): def render(self, result):