org.user, web.client: user creation via zitadel API basically working

This commit is contained in:
Helmut Merz 2025-07-31 17:24:17 +02:00
parent c23069a3c1
commit 7427370b5c
4 changed files with 39 additions and 17 deletions

View file

@ -42,8 +42,6 @@ oidc_params = dict(
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None),
private_key_file=getenv('OIDC_SERVICE_USER_PRIVATE_KEY_FILE', '.private-key.json'),
organization_id=getenv('OIDC_ORGANIZATION_ID', '311473502274248525'),
)
oidc_provider_endpoints = dict(
user='v2/users/human',
)

View file

@ -12,4 +12,5 @@ DBSCHEMA=demo
OIDC_PROVIDER=
OIDC_CLIENT_ID=
OIDC_COOKIE_CRYPT=
OIDC_ORGANIZATION_ID=

View file

@ -3,6 +3,8 @@
"""Basic user account (principal) definitions + access to identity provider."""
from dataclasses import dataclass, field
from typing import List, Optional
from scopes.web import client
from scopes import util
@ -12,33 +14,48 @@ import config
@dataclass
class User:
name: str
login: str
email: str
fullName: str
hashedPassword: Optional[str] = None
firstName: str = ''
lastName: str = ''
grants: List[str] = field(default_factory=list)
class ExtUser:
"""All infos for exchanging user data with an external service.
This base class implements the zitadel interface. For other
identity providers sublass accordingly.
This base class implements the zitadel interface (as of version 3.3.2).
For other identity providers sublass accordingly.
"""
provider = 'zitatel'
endpoints = dict(
users='v2/users/human',
users_human='v2/users/human',
)
def __init__(self, user, organization, userId=None, userIdPrefix='', grants=None):
def __init__(self, user, idPrefix=''):
self.user = user
self.grants = grants or []
self.userId = idPrefix + user.login
def asDict(self):
return dict(username=self.user.name)
params = config.oidc_params
data = dict(
userId=self.userId,
username=self.user.login,
email=dict(email=self.user.email, isVerified=True),
profile=dict(
givenName=self.user.firstName,
familyName=self.user.lastName,
),
organization=dict(orgId=params['organization_id']),
)
if self.user.hashedPassword:
data['hashedPassword'] = self.user.hashedPassword
return data
def send(self):
clt = client.ApiClient(config.oidc_provider)
data = self.asDict()
res = clt.post(config.oidc_provider_endpoints['users'], data)
res = clt.post(self.endpoints['users_human'], data)

View file

@ -2,16 +2,20 @@
"""Web client functionality: access to web sites, APIs with authentication."""
import logging
import requests
from scopes.web.auth import oidc
import config
logger = logging.getLogger('web.client')
class ApiClient:
def __init__(self, baseUrl):
def __init__(self, baseUrl, authToken=None):
self.baseUrl = baseUrl
self.authToken = None
self.authToken = authToken
def authentication(self):
if self.authToken == None:
@ -21,9 +25,11 @@ class ApiClient:
def post(self, endpoint, data):
headers = self.authentication()
# self.makeUrl(endpoint)
url = '/'.join(self.baseUrl, endpoint)
resp = requests.post(url, data=data, headers=headers)
# check: resp.status_code
url = '/'.join((self.baseUrl, endpoint))
resp = requests.post(url, json=data, headers=headers)
if resp.status_code != 200:
logger.error('post %s: %s', url, resp.text)
return resp.text
data = resp.json()
return data