move API client authentication to auth.oidc

This commit is contained in:
Helmut Merz 2025-07-22 19:07:02 +02:00
parent b425462f12
commit 626ff6e673
4 changed files with 64 additions and 51 deletions

View file

@ -39,7 +39,8 @@ oidc_params = dict(
cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id), cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id),
cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None), cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None),
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'), cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None) cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None),
private_key_file=getenv('OIDC_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json'),
) )
# access zitadel API # access zitadel API

View file

@ -52,10 +52,6 @@ oidc_params = dict(
cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id), cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id),
cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None), cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None),
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'), cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None) cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None),
) private_key_file=getenv('OIDC_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json'),
# access zitadel API
zitadel_params = dict(
private_key_file=('ZITADEL_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json')
) )

View file

@ -1,6 +1,7 @@
# scopes.web.auth.uidc # scopes.web.auth.uidc
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
from datetime import datetime, timedelta, timezone
from email.utils import formatdate from email.utils import formatdate
import json import json
import jwt import jwt
@ -22,6 +23,8 @@ import config
logger = logging.getLogger('web.auth.oidc') logger = logging.getLogger('web.auth.oidc')
# OIDC authentication for browser users (principals)
@implementer(IAuthentication) @implementer(IAuthentication)
class OidcAuthentication: class OidcAuthentication:
@ -241,3 +244,41 @@ def loadOidcProviderData(force=False):
def loadOidcKeys(uri): def loadOidcKeys(uri):
return dict((item['kid'], item) for item in requests.get(uri).json()['keys']) return dict((item['kid'], item) for item in requests.get(uri).json()['keys'])
# service user authentication
def authenticateClient(paramsName='oidc_params'):
loadOidcProviderData()
params = getattr(config, paramsName)
keyData = loadPrivateKeyData(params['private_key_file'])
userId = keyData['userId']
keyId = keyData['keyId']
key = keyData['key']
now = datetime.now(timezone.utc)
token_lifetime=params.get('api_token_lifetime', 60)
payload = dict(
iss=userId, sub=userId, aud=config.oidc_provider,
iat=now, exp=now + timedelta(minutes=token_lifetime),
)
jwToken = jwt.encode(payload, key, algorithm="RS256",
headers=dict(alg='RS256', kid=keyId))
data = dict(
grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer',
scope='openid urn:zitadel:iam:org:project:id:zitadel:aud',
assertion=jwToken,
)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
url = params['op_uris']['token_endpoint']
resp = requests.post(url, data=data, headers=headers)
if resp.status_code != 200:
#print(resp.text)
logger.error('authenticateClient: %s', resp.text)
return None
tdata = resp.json()
print(tdata)
return tdata['access_token']
def loadPrivateKeyData(fn='.private-key.json'):
with open(fn) as f:
return json.load(f)

View file

@ -1,54 +1,29 @@
# scopes.web.client # scopes.web.client
"""Web client functionality: access to web sites, APIs - including authentication.""" """Web client functionality: access to web sites, APIs with authentication."""
from datetime import datetime, timedelta, timezone
import json
import jwt
import requests import requests
import config import config
def postApi(url, token=None): class ApiClient:
if token is None:
token = authenticateJwt() def __init__(self, baseUrl):
headers = dict(Authorization=f'Bearer {token}') self.baseUrl = baseUrl
resp = requests.post(url, headers=headers) self.authToken = None
def authentication(self):
if self.authToken = None:
self.authToken = oidc.authenticateClient()
return dict(Authorization=f'Bearer {self.authToken}')
def post(self, endpoint, data):
headers = self.authentication()
# self.makeUrl(endpoint)
url = '/'.join(self.bareUrl, endpoint)
resp = requests.post(url, data=data, headers=headers)
# check: resp.status_code
data = resp.json() data = resp.json()
data['_auth_token'] = token
return data return data
def authenticateJwt(paramsName='zitadel_params'):
params = getattr(config, paramsName)
keyData = loadPrivateKeyData(params['private_key_file'])
userId = keyData['userId']
keyId = keyData['keyId']
key = keyData['key']
now = datetime.now(timezone.utc)
token_lifetime=params.get('token_lifetime', 60)
payload = dict(
iss=userId, sub=userId, aud=config.oidc_provider,
iat=now, exp=now + timedelta(minutes=token_lifetime),
)
jwToken = jwt.encode(payload, key, algorithm="RS256",
headers=dict(alg='RS256', kid=keyId))
data = dict(
grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer',
scope='openid urn:zitadel:iam:org:project:id:zitadel:aud',
assertion=jwToken,
)
print(data)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
url = config.oidc_provider + '/oauth/v2/token'
print(url)
resp = requests.post(url, data=data, headers=headers)
if resp.status_code != 200:
print(resp.text)
return None
tdata = resp.json()
return tdata['access_token']
def loadPrivateKeyData(fn='.private-key.json'):
with open(fn) as f:
return json.load(f)