From b425462f126eff81e6ffb0fe36cb1751d0170783 Mon Sep 17 00:00:00 2001 From: Helmut Merz Date: Tue, 22 Jul 2025 09:41:24 +0200 Subject: [PATCH] API authentication on zitadel server working --- demo/config.py | 2 +- scopes/web/client.py | 49 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/demo/config.py b/demo/config.py index 4bfcfdf..3ef052c 100644 --- a/demo/config.py +++ b/demo/config.py @@ -44,5 +44,5 @@ oidc_params = dict( # access zitadel API zitadel_params = dict( - private_key_file=('ZITADEL_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json') + private_key_file=getenv('ZITADEL_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json') ) diff --git a/scopes/web/client.py b/scopes/web/client.py index dd12bf8..8eb77d5 100644 --- a/scopes/web/client.py +++ b/scopes/web/client.py @@ -2,4 +2,53 @@ """Web client functionality: access to web sites, APIs - including authentication.""" +from datetime import datetime, timedelta, timezone +import json +import jwt import requests + +import config + + +def postApi(url, token=None): + if token is None: + token = authenticateJwt() + headers = dict(Authorization=f'Bearer {token}') + resp = requests.post(url, headers=headers) + data = resp.json() + data['_auth_token'] = token + return data + +def authenticateJwt(paramsName='zitadel_params'): + params = getattr(config, paramsName) + keyData = loadPrivateKeyData(params['private_key_file']) + userId = keyData['userId'] + keyId = keyData['keyId'] + key = keyData['key'] + now = datetime.now(timezone.utc) + token_lifetime=params.get('token_lifetime', 60) + payload = dict( + iss=userId, sub=userId, aud=config.oidc_provider, + iat=now, exp=now + timedelta(minutes=token_lifetime), + ) + jwToken = jwt.encode(payload, key, algorithm="RS256", + headers=dict(alg='RS256', kid=keyId)) + data = dict( + grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer', + scope='openid urn:zitadel:iam:org:project:id:zitadel:aud', + assertion=jwToken, + ) + print(data) + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + url = config.oidc_provider + '/oauth/v2/token' + print(url) + resp = requests.post(url, data=data, headers=headers) + if resp.status_code != 200: + print(resp.text) + return None + tdata = resp.json() + return tdata['access_token'] + +def loadPrivateKeyData(fn='.private-key.json'): + with open(fn) as f: + return json.load(f)