work in progress: oidc auth tests
This commit is contained in:
		
							parent
							
								
									01fc7d2874
								
							
						
					
					
						commit
						6857601ab8
					
				
					 2 changed files with 17 additions and 15 deletions
				
			
		|  | @ -10,6 +10,8 @@ from zope.publisher.publish import publish | ||||||
| from scopes.web.app import Publication | from scopes.web.app import Publication | ||||||
| from scopes.storage.folder import Root | from scopes.storage.folder import Root | ||||||
| 
 | 
 | ||||||
|  | logger = logging.getLogger('tlib_web') | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| def publishRequest(config, storage, path): | def publishRequest(config, storage, path): | ||||||
|     appRoot = Root(storage) |     appRoot = Root(storage) | ||||||
|  | @ -20,7 +22,6 @@ def publishRequest(config, storage, path): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_app(self, config): | def test_app(self, config): | ||||||
|     logger = logging.getLogger('tlib_web') |  | ||||||
|     storage = config.storageFactory(config.dbschema) |     storage = config.storageFactory(config.dbschema) | ||||||
|     response = publishRequest(config, storage, '/top') |     response = publishRequest(config, storage, '/top') | ||||||
|     logger.info('test_app: response %s %s', response.getStatus(), response.getHeaders()) |     logger.info('test_app: response %s %s', response.getStatus(), response.getHeaders()) | ||||||
|  | @ -31,3 +32,8 @@ def test_auth(self, config): | ||||||
|     from scopes.web.auth import oidc |     from scopes.web.auth import oidc | ||||||
|     oidc.startup()  # todo: use generic app.startServices() |     oidc.startup()  # todo: use generic app.startServices() | ||||||
|     self.assertEqual(len(config.oidc_params['op_uris']), 8) |     self.assertEqual(len(config.oidc_params['op_uris']), 8) | ||||||
|  |     storage = config.storageFactory(config.dbschema) | ||||||
|  |     response = publishRequest(config, storage, '/top/auth/login') | ||||||
|  |     headers = dict(response.getHeaders()) | ||||||
|  |     logger.info('test_auth: response %s %s', response.getStatus(), headers) | ||||||
|  |     self.assertEqual(response.getStatus(), 302) | ||||||
|  |  | ||||||
|  | @ -199,24 +199,20 @@ class Authenticator(DummyFolder): | ||||||
|         return data |         return data | ||||||
| 
 | 
 | ||||||
|     def getIdTokenData(self, token): |     def getIdTokenData(self, token): | ||||||
|         keyUri = self.params['op_uris']['jwks_uri'] |         uri = self.params['op_uris']['jwks_uri'] | ||||||
|         jwksClient = jwt.PyJWKClient(keyUri) |         keys = self.loadPublicKeys(uri) | ||||||
|  |         header = jwt.get_unverified_header(token) | ||||||
|  |         key = jwt.PyJWK(keys[header['kid']]) | ||||||
|  |         return jwt.decode(token, key, audience=self.params.client_id) | ||||||
|  |         jwksClient = jwt.PyJWKClient(uri) | ||||||
|         key = jwksClient.get_signing_key_from_jwt(token) |         key = jwksClient.get_signing_key_from_jwt(token) | ||||||
|         return jwt.decode(token, key, options=dict(verify_aud=False)) |         return jwt.decode(token, key, options=dict(verify_aud=False)) | ||||||
|         header = jwt.get_unverified_header(token) |  | ||||||
|         kid = header['kid'] |  | ||||||
|         key = self.loadOidcKeys()[kid] |  | ||||||
|         return jwt.decode(token, key, audience=self.params.client_id) |  | ||||||
| 
 | 
 | ||||||
|     def loadOidcKeys(self): |     def loadOidcKeys(self, uri): | ||||||
|         result = {} |         return dict((item['kid'], item) for item in requests.get(uri).json()['keys']) | ||||||
|         keyUri = self.params['op_uris']['jwks_uri'] |  | ||||||
|         for k in requests.get(keyUri).json()['keys']: |  | ||||||
|             result[k['kid']] = jwt.PyJWK(k) |  | ||||||
|         return result |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @register('auth', Root) | @register('auth') | ||||||
| def authView(context, request): | def authView(context, request): | ||||||
|     return Authenticator(request) |     return Authenticator(request) | ||||||
| 
 | 
 | ||||||
|  | @ -254,4 +250,4 @@ def loadOidcProviderData(force=False): | ||||||
|         for key in oidcProviderUris: |         for key in oidcProviderUris: | ||||||
|             uris[key] = opData[key] |             uris[key] = opData[key] | ||||||
|     #if force or params.get('op_keys') is None: |     #if force or params.get('op_keys') is None: | ||||||
|         params['op_keys'] = requests.get(uris['jwks_uri']).json()['keys'] |         #params['op_keys'] = requests.get(uris['jwks_uri']).json()['keys'] | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue