auth: basic OIDC flow with cookie encryption and final redirect working
This commit is contained in:
		
							parent
							
								
									ec80be5f97
								
							
						
					
					
						commit
						7bca60e74c
					
				
					 3 changed files with 28 additions and 17 deletions
				
			
		|  | @ -31,5 +31,6 @@ oidc_params = dict( | ||||||
|     cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id), |     cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id), | ||||||
|     cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None), |     cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None), | ||||||
|     cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'), |     cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'), | ||||||
|  |     cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None) | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -29,6 +29,7 @@ app = [ | ||||||
| 	"zope.publisher",  | 	"zope.publisher",  | ||||||
| 	"zope.traversing", | 	"zope.traversing", | ||||||
| ] | ] | ||||||
|  | auth = ["pyjwt[crypto]", "cryptography"] | ||||||
| test = ["zope.testrunner"] | test = ["zope.testrunner"] | ||||||
| #test = ["pytest"] | #test = ["pytest"] | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -1,9 +1,8 @@ | ||||||
| # scopes.server.auth | # scopes.server.auth | ||||||
| 
 | 
 | ||||||
|  | from cryptography.fernet import Fernet | ||||||
| from email.utils import formatdate | from email.utils import formatdate | ||||||
| import json | import json | ||||||
| #from oic import oic, rndstr, unreserved |  | ||||||
| #from oic.oic.message import AuthorizationResponse |  | ||||||
| import requests | import requests | ||||||
| from time import time | from time import time | ||||||
| from urllib.parse import urlencode | from urllib.parse import urlencode | ||||||
|  | @ -61,17 +60,23 @@ class Authenticator(DummyFolder): | ||||||
|     def __init__(self, request): |     def __init__(self, request): | ||||||
|         self.request = request |         self.request = request | ||||||
|         self.params = config.oidc_params |         self.params = config.oidc_params | ||||||
|  |         self.reqUrl = config.base_url | ||||||
|  |         self.setCrypt(self.params['cookie_crypt']) | ||||||
|  | 
 | ||||||
|  |     def setReqUrl(self, base, path): | ||||||
|  |         self.reqUrl = '/'.join((base, path)) | ||||||
|  | 
 | ||||||
|  |     def setCrypt(self, key): | ||||||
|  |         self.cookieCrypt = key and Fernet(key.encode('ASCII')) or None | ||||||
| 
 | 
 | ||||||
|     def authenticate(request): |     def authenticate(request): | ||||||
|  |         ''' return user data or None ''' | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
|     def login(self): |     def login(self): | ||||||
|         req = self.request |         req = self.request | ||||||
|         print('*** login', self, req.getTraversalStack(), req['PATH_INFO']) |         print('*** login', self, req.getTraversalStack(), req['PATH_INFO']) | ||||||
|         #print('***', dir(req)) |         #print('***', dir(req)) | ||||||
|         #client = oic.Client() |  | ||||||
|         #providerInfo = client.provider_config(config.oidc_provider) |  | ||||||
|         #print('***', providerInfo) |  | ||||||
|         state = util.rndstr() |         state = util.rndstr() | ||||||
|         nonce = util.rndstr() |         nonce = util.rndstr() | ||||||
|         codeVerifier = util.rndstr2() |         codeVerifier = util.rndstr2() | ||||||
|  | @ -83,40 +88,40 @@ class Authenticator(DummyFolder): | ||||||
|                 code_challenge=codeChallenge, code_challenge_method='S256', |                 code_challenge=codeChallenge, code_challenge_method='S256', | ||||||
|                 scope='openid profile email urn:zitadel:iam:user:resourceowner', |                 scope='openid profile email urn:zitadel:iam:user:resourceowner', | ||||||
|                 redirect_uri=self.params['callback_url'], |                 redirect_uri=self.params['callback_url'], | ||||||
|  |                 request_uri=self.reqUrl, | ||||||
|         ) |         ) | ||||||
|         #addArgs, codeVerifier = client.add_code_challenge() |  | ||||||
|         #print('***', addArgs, codeVerifier) |  | ||||||
|         #args.update(addArgs) |  | ||||||
|         self.storeSession(dict(state=state, nonce=nonce, code_verifier=codeVerifier)) |         self.storeSession(dict(state=state, nonce=nonce, code_verifier=codeVerifier)) | ||||||
|         loginUrl = '?'.join((self.params['auth_url'], urlencode(args))) |         loginUrl = '?'.join((self.params['auth_url'], urlencode(args))) | ||||||
|         #authReq = client.construct_AuthorizationRequest(request_args=args) |  | ||||||
|         #loginUrl = authReq.request(self.params['auth_url']) |  | ||||||
|         print('***', loginUrl) |         print('***', loginUrl) | ||||||
|         req.response.redirect(loginUrl, trusted=True) |         req.response.redirect(loginUrl, trusted=True) | ||||||
| 
 | 
 | ||||||
|     def callback(self): |     def callback(self): | ||||||
|         req = self.request |         req = self.request | ||||||
|         print('*** callback', self, req.form) |         print('*** callback', self, req.form) | ||||||
|         data = self.loadSession() |         sdata = self.loadSession() | ||||||
|         code = req.form['code'] |         code = req.form['code'] | ||||||
|         print('***', data, code) |         print('*** session data', sdata, code) | ||||||
|         # !check state: req.form['state'] == data['state'] |         # !check state: req.form['state'] == sdata['state'] | ||||||
|         args = dict( |         args = dict( | ||||||
|                 grant_type='authorization_code', |                 grant_type='authorization_code', | ||||||
|                 code=code, |                 code=code, | ||||||
|                 redirect_uri=self.params['callback_url'], |                 redirect_uri=self.params['callback_url'], | ||||||
|                 client_id=self.params['client_id'], |                 client_id=self.params['client_id'], | ||||||
|                 code_verifier=data['code_verifier'] |                 code_verifier=sdata['code_verifier'] | ||||||
|         ) |         ) | ||||||
|         # !set header: 'Content-Type: application/x-www-form-urlencoded' |         # !set header: 'Content-Type: application/x-www-form-urlencoded' | ||||||
|         tokenResponse = requests.post(self.params['token_url'], data=args) |         tokenResponse = requests.post(self.params['token_url'], data=args) | ||||||
|         tdata =  tokenResponse.json() |         tdata =  tokenResponse.json() | ||||||
|         print('***', tdata) |         print('*** token response', tdata) | ||||||
|         headers = dict(Authorization='Bearer ' + tdata['access_token']) |         headers = dict(Authorization='Bearer ' + tdata['access_token']) | ||||||
|         userInfo = requests.get(self.params['userinfo_url'], headers=headers) |         userInfo = requests.get(self.params['userinfo_url'], headers=headers) | ||||||
|         print('***', userInfo.json()) |         print('***', userInfo.json()) | ||||||
|         #self.storeSession(...) |         # get relevant data from userInfo | ||||||
|         #self.req.response.redirect(...) |         # set up session data for authenticate() | ||||||
|  |         ndata = dict( | ||||||
|  |         ) | ||||||
|  |         self.storeSession(ndata) | ||||||
|  |         req.response.redirect(self.reqUrl, trusted=True) | ||||||
| 
 | 
 | ||||||
|     def logout(self): |     def logout(self): | ||||||
|         pass |         pass | ||||||
|  | @ -132,12 +137,16 @@ class Authenticator(DummyFolder): | ||||||
|         #options['httponly'] = True |         #options['httponly'] = True | ||||||
|         name = self.params['cookie_name'] |         name = self.params['cookie_name'] | ||||||
|         value = json.dumps(data) |         value = json.dumps(data) | ||||||
|  |         if self.cookieCrypt: | ||||||
|  |             value = self.cookieCrypt.encrypt(value.encode('ASCII')).decode('ASCII') | ||||||
|         self.request.response.setCookie(name, value, **options) |         self.request.response.setCookie(name, value, **options) | ||||||
| 
 | 
 | ||||||
|     def loadSession(self): |     def loadSession(self): | ||||||
|         cookie = self.request.getCookies().get(self.params['cookie_name']) |         cookie = self.request.getCookies().get(self.params['cookie_name']) | ||||||
|         if cookie is None: |         if cookie is None: | ||||||
|             raise ValueError('Missing authentication cookie') |             raise ValueError('Missing authentication cookie') | ||||||
|  |         if self.cookieCrypt: | ||||||
|  |             cookie = self.cookieCrypt.decrypt(cookie).decode('ASCII') | ||||||
|         data = json.loads(cookie) |         data = json.loads(cookie) | ||||||
|         return data |         return data | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue