diff --git a/demo/config.py b/demo/config.py index 3ef052c..c2fa091 100644 --- a/demo/config.py +++ b/demo/config.py @@ -39,7 +39,8 @@ oidc_params = dict( cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id), cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None), cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'), - cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None) + cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None), + private_key_file=getenv('OIDC_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json'), ) # access zitadel API diff --git a/scopes/tests/config.py b/scopes/tests/config.py index 79ba5e9..19dac0f 100644 --- a/scopes/tests/config.py +++ b/scopes/tests/config.py @@ -52,10 +52,6 @@ oidc_params = dict( cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id), cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None), cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'), - cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None) -) - -# access zitadel API -zitadel_params = dict( - private_key_file=('ZITADEL_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json') + cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None), + private_key_file=getenv('OIDC_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json'), ) diff --git a/scopes/web/auth/oidc.py b/scopes/web/auth/oidc.py index cd769c5..76bd12d 100644 --- a/scopes/web/auth/oidc.py +++ b/scopes/web/auth/oidc.py @@ -1,6 +1,7 @@ # scopes.web.auth.uidc from cryptography.fernet import Fernet +from datetime import datetime, timedelta, timezone from email.utils import formatdate import json import jwt @@ -22,6 +23,8 @@ import config logger = logging.getLogger('web.auth.oidc') +# OIDC authentication for browser users (principals) + @implementer(IAuthentication) class OidcAuthentication: @@ -241,3 +244,41 @@ def loadOidcProviderData(force=False): def loadOidcKeys(uri): return dict((item['kid'], item) for item in requests.get(uri).json()['keys']) + + +# service user authentication + +def authenticateClient(paramsName='oidc_params'): + loadOidcProviderData() + params = getattr(config, paramsName) + keyData = loadPrivateKeyData(params['private_key_file']) + userId = keyData['userId'] + keyId = keyData['keyId'] + key = keyData['key'] + now = datetime.now(timezone.utc) + token_lifetime=params.get('api_token_lifetime', 60) + payload = dict( + iss=userId, sub=userId, aud=config.oidc_provider, + iat=now, exp=now + timedelta(minutes=token_lifetime), + ) + jwToken = jwt.encode(payload, key, algorithm="RS256", + headers=dict(alg='RS256', kid=keyId)) + data = dict( + grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer', + scope='openid urn:zitadel:iam:org:project:id:zitadel:aud', + assertion=jwToken, + ) + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + url = params['op_uris']['token_endpoint'] + resp = requests.post(url, data=data, headers=headers) + if resp.status_code != 200: + #print(resp.text) + logger.error('authenticateClient: %s', resp.text) + return None + tdata = resp.json() + print(tdata) + return tdata['access_token'] + +def loadPrivateKeyData(fn='.private-key.json'): + with open(fn) as f: + return json.load(f) diff --git a/scopes/web/client.py b/scopes/web/client.py index 8eb77d5..37e313f 100644 --- a/scopes/web/client.py +++ b/scopes/web/client.py @@ -1,54 +1,29 @@ # scopes.web.client -"""Web client functionality: access to web sites, APIs - including authentication.""" +"""Web client functionality: access to web sites, APIs with authentication.""" -from datetime import datetime, timedelta, timezone -import json -import jwt import requests import config -def postApi(url, token=None): - if token is None: - token = authenticateJwt() - headers = dict(Authorization=f'Bearer {token}') - resp = requests.post(url, headers=headers) - data = resp.json() - data['_auth_token'] = token - return data +class ApiClient: -def authenticateJwt(paramsName='zitadel_params'): - params = getattr(config, paramsName) - keyData = loadPrivateKeyData(params['private_key_file']) - userId = keyData['userId'] - keyId = keyData['keyId'] - key = keyData['key'] - now = datetime.now(timezone.utc) - token_lifetime=params.get('token_lifetime', 60) - payload = dict( - iss=userId, sub=userId, aud=config.oidc_provider, - iat=now, exp=now + timedelta(minutes=token_lifetime), - ) - jwToken = jwt.encode(payload, key, algorithm="RS256", - headers=dict(alg='RS256', kid=keyId)) - data = dict( - grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer', - scope='openid urn:zitadel:iam:org:project:id:zitadel:aud', - assertion=jwToken, - ) - print(data) - headers = {'Content-Type': 'application/x-www-form-urlencoded'} - url = config.oidc_provider + '/oauth/v2/token' - print(url) - resp = requests.post(url, data=data, headers=headers) - if resp.status_code != 200: - print(resp.text) - return None - tdata = resp.json() - return tdata['access_token'] + def __init__(self, baseUrl): + self.baseUrl = baseUrl + self.authToken = None + + def authentication(self): + if self.authToken = None: + self.authToken = oidc.authenticateClient() + return dict(Authorization=f'Bearer {self.authToken}') + + def post(self, endpoint, data): + headers = self.authentication() + # self.makeUrl(endpoint) + url = '/'.join(self.bareUrl, endpoint) + resp = requests.post(url, data=data, headers=headers) + # check: resp.status_code + data = resp.json() + return data -def loadPrivateKeyData(fn='.private-key.json'): - with open(fn) as f: - return json.load(f)