initial import

This commit is contained in:
Helmut Merz 2024-09-30 10:33:14 +02:00
commit c9d4f525e9
20 changed files with 1451 additions and 0 deletions

12
.gitignore vendored Normal file
View file

@ -0,0 +1,12 @@
*.pyc
*.pyo
*.swp
dist/
var/
*.egg-info
*.project
*.pydevproject
*.ropeproject
*.sublime-project
*.sublime-workspace
.settings

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (C) 2023 cyberconcepts.org team
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

7
README.md Normal file
View file

@ -0,0 +1,7 @@
# Introduction
This is the main part of the code of the semantic
web application platform *loops*, based on
Zope 3 / bluebream.
More information: see https://www.cyberconcepts.org.

1
cco/__init__.py Normal file
View file

@ -0,0 +1 @@
# loops-ext/cco/member

88
cco/member/README.txt Normal file
View file

@ -0,0 +1,88 @@
======================================================================
cco.member - cyberconcepts.org: member registration and authentication
======================================================================
>>> from zope.session.interfaces import ISession
>>> from zope.publisher.browser import TestRequest
>>> from logging import getLogger
>>> log = getLogger('cco.member.auth')
>>> from loops.setup import addAndConfigureObject, addObject
>>> from loops.concept import Concept
>>> from loops.common import adapted
>>> concepts = loopsRoot['concepts']
>>> len(list(concepts.keys()))
10
>>> from loops.browser.node import NodeView
>>> home = loopsRoot['views']['home']
>>> homeView = NodeView(home, TestRequest())
Session Credentials Plug-in with optional 2-factor authentication
=================================================================
>>> from cco.member.auth import SessionCredentialsPlugin
>>> scp = SessionCredentialsPlugin()
When retrieving credentials for a standard request we get the usual
login + password dictionary.
>>> input = dict(login='scott', password='tiger')
>>> req = TestRequest(home, form=input)
>>> scp.extractCredentials(req)
{'login': 'scott', 'password': 'tiger'}
When the URL contains an authentication method reference to the 2-factor
authentication the first phase of the authentication (redirection to
TAN entry form) is executed.
>>> sdata = ISession(req).get('zope.pluggableauth.browserplugins')
>>> sdata['credentials'] = None
>>> req.setTraversalStack(['++auth++2factor'])
>>> scp.extractCredentials(req)
'2fa_tan_form.html?a=...&h=...&b=...'
What if we enter data for authentication phase 2? No authentication
because the hashes don't match.
>>> sdata['credentials'] = None
>>> input = dict(hash='#dummy#', tan_a='1', tan_b='2')
>>> req = TestRequest(home, form=input)
>>> req.setTraversalStack(['++auth++2factor'])
>>> scp.extractCredentials(req)
Password Policy Checking
========================
>>> from cco.member.pwpolicy import checkPassword
>>> checkPassword(u'Test12.')
False
>>> checkPassword(u'TestTest')
False
>>> checkPassword(u'testes.')
False
>>> checkPassword(u'tesT1234')
True
>>> checkPassword(u'tesTtes.')
True
Password Change Form
====================
>>> from cco.member.browser import PasswordChange
Web API
=======
>>> from cco.member.webapi import Users

1
cco/member/__init__.py Normal file
View file

@ -0,0 +1 @@
# package cco.member

124
cco/member/auth.pt Normal file
View file

@ -0,0 +1,124 @@
<html i18n:domain="cco.member">
<metal:login define-macro="login_form"
i18n:domain="cco.member"
tal:define="principal request/principal/id">
<p style="color: Red"
tal:condition="request/error_message|nothing"
i18n:translate=""
tal:content="request/error_message" />
<h2 i18n:translate="title_login">Login</h2>
<div>
<p i18n:translate="description_login"
tal:condition="python: principal == 'zope.anybody'">
Please provide Login Information</p>
<form method="post"
tal:define="submitted python:
principal != 'zope.anybody' and 'SUBMIT' in request">
<tal:redirect condition="submitted">
<span tal:define="dummy python:request.response.redirect(
request.get('camefrom') or request.URL[-1])" />
</tal:redirect>
<tal:form condition="not:submitted">
<input type="hidden" name="base_url"
tal:attributes="value request/URL/-1" />
<div class="row">
<div class="label" i18n:translate="label_login">User Name</div>
<div class="field">
<input type="text" name="login"/></div>
</div><br />
<div class="row">
<div class="label" i18n:translate="label_password">Password</div>
<div class="field">
<input type="password" name="password"/></div>
</div><br />
<div class="row">
<input tal:condition="python:view.authenticationMethod != '2factor'"
class="form-element" type="submit"
name="SUBMIT" value="Log in"
i18n:attributes="value button_login" />
<input tal:condition="python:view.authenticationMethod == '2factor'"
class="form-element" type="submit"
name="SUBMIT" value="Log in"
i18n:attributes="value button_login_2factor" />
</div>
<input type="hidden" name="camefrom"
tal:attributes="value request/camefrom | nothing">
</tal:form>
</form>
</div>
</metal:login>
<metal:tan define-macro="tan_form"
i18n:domain="cco.member"
tal:define="principal request/principal/id;
a request/a|nothing;
b request/b|nothing;
email item/sendTanEmail;
baseUrl request/URL/-1">
<h2 i18n:translate="title_login_tan">Login: TAN Entry</h2>
<tal:form condition="python:principal == 'zope.anybody' and a and b and email">
<p i18n:translate="message_enter_tan_digits_$email_$a_$b">
An E-mail with a TAN has been sent to
<span tal:content="email" i18n:name="email" />.
Please enter digits
<strong tal:content="request/a" i18n:name="a">A</strong> and
<strong tal:content="request/b" i18n:name="b">B</strong> below.</p>
<form method="post" tal:attributes="action baseUrl">
<input type="hidden" name="hash"
tal:attributes="value request/h|nothing" />
<input type="hidden" name="camefrom"
tal:attributes="value request/camefrom|string:">
<input type="hidden" name="base_url"
tal:attributes="value baseUrl" />
<div class="row">
<div class="label" i18n:translate="label_tan_$a">TAN Digit
<span tal:content="request/a" i18n:name="a" /></div>
<div class="field">
<input type="text" name="tan_a" size="2" maxlength="1"
tal:attributes="value request/tan_a|string:"/>
</div>
</div>
<div class="row">
<div class="label" i18n:translate="label_tan_$b">TAN Digit
<span tal:content="request/b" i18n:name="b" /></div>
<div class="field">
<input type="text" name="tan_b" size="2" maxlength="1"
tal:attributes="value request/tan_b|string:"/>
</div>
</div><br />
<div class="row">
<input class="form-element" type="submit"
name="SUBMIT" value="Log in"
i18n:attributes="value button_login_tan" />
</div>
</form>
</tal:form>
<tal:redir condition="python:
(principal != 'zope.anybody' or not (a and b and email)) and
request.response.redirect(baseUrl + '?error_message=Invalid+login+data!')" />
</metal:tan>
<metal:form define-macro="reset_form">
<h1 i18n:translate=""
tal:content="view/label|default">Edit</h1>
<div>
<form method="post">
<input type="hidden" name="action" value="update" />
<tal:token condition="request/token|nothing">
<input type="hidden" name="token"
tal:attributes="value request/token|string:" />
</tal:token>
<br />
<metal:fields use-macro="view/fieldRenderers/fields" />
<br />
<input type="submit" name="submit" value="Save"
i18n:attributes="value"
tal:attributes="value view/label_submit|string:Save" />
</form>
</div>
</metal:form>
</html>

350
cco/member/auth.py Normal file
View file

@ -0,0 +1,350 @@
#
# Copyright (c) 2023 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Specialized authentication components.
"""
import hashlib
import logging
import random
from datetime import datetime, timedelta
from email.MIMEText import MIMEText
from urllib import urlencode
import requests
from zope.app.component import hooks
from zope.interface import Interface, implements
from zope import component
from zope.pluggableauth.interfaces import IAuthenticatedPrincipalFactory
from zope.pluggableauth.plugins.session import SessionCredentialsPlugin \
as BaseSessionCredentialsPlugin
from zope.pluggableauth.plugins.session import SessionCredentials
from zope.publisher.interfaces.http import IHTTPRequest
from zope.session.interfaces import ISession
from zope.traversing.browser import absoluteURL
from zope.traversing.namespace import view
from loops.browser.node import getViewConfiguration
from loops.organize.interfaces import IPresence
from loops.organize.party import getAuthenticationUtility
from loops.util import _
try:
from config import single_sign_on as sso
except ImportError:
sso = None
try:
from config import jwt_key
except ImportError:
jwt_key = None
try:
import python_jwt as jwt
import jwcrypto.jwk as jwk
except ImportError:
pass
TIMEOUT = timedelta(minutes=60)
if jwt_key is not None:
JWT_SECRET = jwk.JWK.from_pem(jwt_key)
else:
JWT_SECRET = None
#PRIVKEY = "6LcGPQ4TAAAAABCyA_BCAKPkD6wW--IhUicbAZ11" # for captcha
log = logging.getLogger('cco.member.auth')
class AuthURLNameSpace(view):
def traverse(self, name, ignored):
self.request.shiftNameToApplication()
# ignore, has already been evaluated by credentials plugin
return self.context
class TwoFactorSessionCredentials(SessionCredentials):
def __init__(self, login, password):
self.login = login
self.password = password
self.tan = random.randint(100000, 999999)
self.timestamp = datetime.now()
rng = range(len(str(self.tan)))
t1 = random.choice(rng)
rng.remove(t1)
t2 = random.choice(rng)
self.tanA, self.tanB = sorted((t1, t2))
self.hash = (hashlib.
sha224("%s:%s:%s" % (login, password, self.tan)).
hexdigest())
self.validated = False
class SessionCredentialsPlugin(BaseSessionCredentialsPlugin):
tan_a_field = 'tan_a'
tan_b_field = 'tan_b'
hash_field = 'hash'
tokenField = 'token'
secure_cookie_token_ns = 'zope_3_sct'
def setSecureCookieToken(self, credentials, request):
response = request.response
options = {}
expires = 'Tue, 19 Jan 2038 00:00:00 GMT'
options['expires'] = expires
options['secure'] = True
options['HttpOnly'] = True
payload = dict(sub=credentials.login)
secret = jwk.JWK.from_pem(jwt_key)
token = jwt.generate_jwt(payload, secret, 'PS256', timedelta(days=365))
response.setCookie(
self.secure_cookie_token_ns, token,
path=request.getApplicationURL(path_only=True),
**options)
def validateSecureCookieToken(self, request, login):
token = request.getCookies().get(self.secure_cookie_token_ns)
if token:
try:
header, claims = jwt.verify_jwt(token, JWT_SECRET, ['PS256'])
if claims.get('sub') == login:
return True
except Exception as e:
log.warn('invalid cookie token %s' % token)
return False
return False
def extractCredentials(self, request):
from cco.member.browser import validateToken
if not IHTTPRequest.providedBy(request):
return None
login = request.get(self.loginfield, None)
password = request.get(self.passwordfield, None)
token = request.get(self.tokenField)
session = ISession(request)
sessionData = session.get('zope.pluggableauth.browserplugins')
traversalStack = request.getTraversalStack()
authMethod = 'standard'
credentials = None
if not token or (token and not validateToken(token)):
if sessionData:
credentials = sessionData.get('credentials')
if isinstance(credentials, TwoFactorSessionCredentials):
authMethod = '2factor'
if (authMethod == 'standard' and
traversalStack and traversalStack[-1].startswith('++auth++')):
### SSO: do not switch to 2factor if logged-in via sso
#if not getattr(credentials, 'sso_source', None):
if not credentials and (not token or not validateToken(token)):
authMethod = traversalStack[-1][8:]
viewAnnotations = request.annotations.setdefault('loops.view', {})
viewAnnotations['auth_method'] = authMethod
#log.info('authentication method: %s.' % authMethod)
if authMethod == 'standard' or self.validateSecureCookieToken(request, login):
return self.extractStandardCredentials(
request, login, password, session, credentials)
elif authMethod == '2factor':
return self.extract2FactorCredentials(
request, login, password, session, credentials)
else:
return None
def extractStandardCredentials(self, request, login, password,
session, credentials):
sso_source = request.get('sso_source', None)
if login and password:
credentials = SessionCredentials(login, password)
### SSO: send login request to sso.targets
if sso_source:
credentials.sso_source = sso_source
else:
sso_send_login(login, password, request)
if credentials:
sessionData = session['zope.pluggableauth.browserplugins']
### SSO: do not overwrite existing credentials on sso login
if not sessionData.get('credentials') or not sso_source:
if credentials != sessionData.get('credentials'):
sessionData['credentials'] = credentials
else:
return None
login = credentials.getLogin()
password = credentials.getPassword()
return dict(login=login, password=password)
def extract2FactorCredentials(self, request, login, password,
session, credentials):
tan_a = request.get(self.tan_a_field, None)
tan_b = request.get(self.tan_b_field, None)
hash = request.get(self.hash_field, None)
if (login and password) and not (tan_a or tan_b or hash):
sessionData = session.get('zope.pluggableauth.browserplugins')
return self.processPhase1(request, session, login, password)
if (tan_a and tan_b and hash) and not (login or password):
credentials = self.processPhase2(request, session, hash, tan_a, tan_b)
if credentials and credentials.validated:
login = credentials.getLogin()
password = credentials.getPassword()
### SSO: send login request to sso.targets
if tan_a and tan_b:
sso_send_login(login, password, request)
return dict(login=login, password=password)
return None
def processPhase1(self, request, session, login, password, msg=None):
sessionData = session['zope.pluggableauth.browserplugins']
credentials = TwoFactorSessionCredentials(login, password)
sessionData['credentials'] = credentials
# send email
log.info("Processing phase 1, TAN: %s. " % credentials.tan)
params = dict(h=credentials.hash,
a=credentials.tanA+1,
b=credentials.tanB+1,
skip2factor=request.form.get('skip2factor'))
if msg:
params['loops.message'] = msg
url = self.getUrl(request, '2fa_tan_form.html', params)
return request.response.redirect(url, trusted=True)
def processPhase2(self, request, session, hash, tan_a, tan_b):
def _validate_tans(a, b, creds):
tan = str(creds.tan)
return tan[creds.tanA] == a and tan[creds.tanB] == b
sessionData = session['zope.pluggableauth.browserplugins']
credentials = sessionData.get('credentials')
if not credentials:
msg = 'Missing credentials'
return log.warn(msg)
log.info("Processing phase 2, TAN: %s. " % credentials.tan)
if credentials.hash != hash:
msg = 'Illegal hash.'
return log.warn(msg)
if credentials.timestamp < datetime.now() - TIMEOUT:
msg = 'Timeout exceeded.'
request.form['loops.message'] = msg
return log.warn(msg)
if not _validate_tans(tan_a, tan_b, credentials):
msg = 'TAN digits not correct.'
log.warn(msg)
params = dict(h=credentials.hash,
a=credentials.tanA+1,
b=credentials.tanB+1,
skip2factor=request.form.get('skip2factor'))
params['loops.message'] = msg
url = self.getUrl(request, '2fa_tan_form.html', params)
request.response.redirect(url, trusted=True)
return None
credentials.validated = True
log.info('Credentials valid.')
# TODO: only overwrite if changed:
sessionData['credentials'] = credentials
if request.form.get('skip2factor'):
self.setSecureCookieToken(credentials, request)
if request.get('camefrom'):
request.response.redirect(request['camefrom'], trusted=True)
return credentials
def getUrl(self, request, action, params):
if request.get('camefrom'):
params['camefrom'] = request['camefrom']
baseUrl = request.get('base_url') or ''
if baseUrl and not baseUrl.endswith('/'):
baseUrl += '/'
return '%s%s?%s' % (baseUrl, action, urlencode(params))
def challenge(self, request):
if not IHTTPRequest.providedBy(request):
return False
site = hooks.getSite()
path = request['PATH_INFO'].split('/++/')[-1] # strip virtual host stuff
if not path.startswith('/'):
path = '/' + path
camefrom = request.getApplicationURL() + path
if 'login' in camefrom:
camefrom = '/'.join(camefrom.split('/')[:-1])
url = '%s/@@%s?%s' % (absoluteURL(site, request),
self.loginpagename,
urlencode({'camefrom': camefrom}))
request.response.redirect(url)
return True
def logout(self, request):
presence = component.getUtility(IPresence)
presence.removePresentUser(request.principal.id)
super(SessionCredentialsPlugin, self).logout(request)
def getCredentials(request):
session = ISession(request)
sessionData = session.get('zope.pluggableauth.browserplugins')
if not sessionData:
return None
return sessionData.get('credentials')
def getPrincipalFromCredentials(context, request, credentials):
if not credentials:
return None
cred = dict(login=credentials.getLogin(),
password=credentials.getPassword())
auth = getAuthenticationUtility(context)
authenticatorPlugins = [p for n, p in auth.getAuthenticatorPlugins()]
for authplugin in authenticatorPlugins:
if authplugin is None:
continue
info = authplugin.authenticateCredentials(cred)
if info is None:
continue
info.authenticatorPlugin = authplugin
principal = component.getMultiAdapter((info, request),
IAuthenticatedPrincipalFactory)(auth)
principal.id = auth.prefix + info.id
return principal
def getPrincipalForUsername(username, context, request):
auth = getAuthenticationUtility(context)
authenticatorPlugins = [p for n, p in auth.getAuthenticatorPlugins()]
for authplugin in authenticatorPlugins:
if authplugin is None:
continue
info = authplugin.get(username)
if info is None:
continue
info.authenticatorPlugin = authplugin
principal = info
#principal = component.getMultiAdapter((info, request),
# IAuthenticatedPrincipalFactory)(auth)
principal.id = authplugin.prefix + info.login
return principal
def sso_send_login(login, password, request):
if not sso:
return
data = dict(login=login, password=password,
sso_source=sso.get('source', ''))
for url in sso['targets']:
resp = requests.post(url, data, cookies=dict(request.cookies),
allow_redirects=False)
log.info('sso_login - url: %s, login: %s -> %s.' % (
url, login, resp.status_code))

434
cco/member/browser.py Normal file
View file

@ -0,0 +1,434 @@
#
# Copyright (c) 2016 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Login, logout, unauthorized stuff.
"""
try:
import python_jwt as jwt
import jwcrypto.jws as jws
except ImportError:
pass
from datetime import timedelta
from email.MIMEText import MIMEText
import logging
from zope.app.exception.browser.unauthorized import Unauthorized as DefaultUnauth
from zope.app.pagetemplate import ViewPageTemplateFile
from zope.app.security.interfaces import IAuthentication
from zope.app.security.interfaces import ILogout, IUnauthenticatedPrincipal
from zope.cachedescriptors.property import Lazy
from zope import component
from zope.i18n import translate
from zope.i18nmessageid import MessageFactory
from zope.interface import implements
from zope.publisher.interfaces.http import IHTTPRequest
from zope.sendmail.interfaces import IMailDelivery
from cco.member.auth import getCredentials, getPrincipalFromCredentials,\
getPrincipalForUsername, JWT_SECRET
from cco.member.interfaces import IPasswordChange, IPasswordReset
from cco.member.pwpolicy import checkPassword
from cybertools.composer.schema.browser.common import schema_macros
from cybertools.composer.schema.browser.form import Form
from cybertools.composer.schema.schema import FormState, FormError
from loops.browser.concept import ConceptView
from loops.browser.node import NodeView, getViewConfiguration
from loops.common import adapted
from loops.organize.interfaces import IMemberRegistrationManager
from loops.organize.party import getPersonForUser
from loops.organize.util import getPrincipalForUserId, getPrincipalFolder
try:
import config
except ImportError:
config = None
log = logging.getLogger('cco.member.browser')
_ = MessageFactory('cco.member')
template = ViewPageTemplateFile('auth.pt')
def validateToken(token, secret=None):
if not secret:
secret = JWT_SECRET
try:
header, claims = jwt.verify_jwt(token, secret, ['PS256'])
except (jwt._JWTError, jws.InvalidJWSSignature,
jws.InvalidJWSObject, ValueError):
return False
return True
class LoginConcept(ConceptView):
@Lazy
def macro(self):
return template.macros['login_form']
class LoginForm(NodeView):
@Lazy
def macro(self):
return template.macros['login_form']
@Lazy
def item(self):
return self
@Lazy
def isVisible(self):
return self.isAnonymous
def update(self, topLevel=True):
if 'SUBMIT' in self.request.form and not self.isAnonymous:
self.request.response.redirect(self.topMenu.url)
return False
return True
class TanForm(LoginForm):
@Lazy
def macro(self):
return template.macros['tan_form']
@Lazy
def credentials(self):
return getCredentials(self.request)
def sendTanEmail(self):
if self.credentials is None:
log.warn('credentials missing')
return None
person = None
cred = self.credentials
principal = getPrincipalFromCredentials(
self.context, self.request, cred)
if principal is not None:
person = adapted(getPersonForUser(
self.context, self.request, principal))
if person is None: # invalid credentials
log.warn('invalid credentials: %s, %s' % (cred.login, cred.tan))
# TODO: display message
return None
tan = self.credentials.tan
recipient = getattr(person, 'tan_email', None) or person.email
recipients = [recipient]
lang = self.languageInfo.language
subject = translate(_(u'tan_mail_subject'), target_language=lang)
message = translate(_(u'tan_mail_text_$tan', mapping=dict(tan=tan)),
target_language=lang)
senderInfo = self.globalOptions('email.sender')
sender = senderInfo and senderInfo[0] or 'info@loops.cy55.de'
sender = sender.encode('UTF-8')
msg = MIMEText(message.encode('UTF-8'), 'plain', 'UTF-8')
msg['Subject'] = subject.encode('UTF-8')
msg['From'] = sender
msg['To'] = ', '.join(recipients)
mailhost = component.getUtility(IMailDelivery, 'Mail')
mailhost.send(sender, recipients, msg.as_string())
return recipient
class Logout(object):
implements(ILogout)
def __init__(self, context, request):
self.context = context
self.request = request
def __call__(self):
nextUrl = self.request.get('nextURL') or self.request.URL[-1]
if not IUnauthenticatedPrincipal.providedBy(self.request.principal):
auth = component.getUtility(IAuthentication)
ILogout(auth).logout(self.request)
return self.request.response.redirect(nextUrl)
class LogoutView(NodeView):
@Lazy
def body(self):
nextUrl = self.topMenu.url
if not IUnauthenticatedPrincipal.providedBy(self.request.principal):
auth = component.getUtility(IAuthentication)
ILogout(auth).logout(self.request)
return self.request.response.redirect(nextUrl)
class Unauthorized(ConceptView):
isTopLevel = True
def __init__(self, context, request):
self.context = context
self.request = request
def __call__(self):
response = self.request.response
response.setStatus(403)
# make sure that squid does not keep the response in the cache
response.setHeader('Expires', 'Mon, 26 Jul 1997 05:00:00 GMT')
response.setHeader('Cache-Control', 'no-store, no-cache, must-revalidate')
response.setHeader('Pragma', 'no-cache')
if self.nodeView is None:
v = DefaultUnauth(self.context, self.request)
return v()
url = self.nodeView.topMenu.url
if self.isAnonymous:
response.redirect(url)
else:
response.redirect(url + '/unauthorized')
class PasswordChange(NodeView, Form):
interface = IPasswordChange
message = _(u'message_password_changed')
formErrors = dict(
confirm_nomatch=FormError(_(u'error_password_confirm_nomatch')),
wrong_oldpw=FormError(_(u'error_password_wrong_oldpw')),
invalid_pw=FormError(_(u'error_password_invalid_pw')),
)
label = label_submit = _(u'label_change_password')
@Lazy
def macro(self):
return schema_macros.macros['form']
@Lazy
def item(self):
return self
@Lazy
def data(self):
return dict(oldPassword=u'', password=u'', passwordConfirm=u'')
def update(self):
form = self.request.form
if not form.get('action'):
return True
formState = self.formState = self.validate(form)
if formState.severity > 0:
return True
pw = form.get('password')
if not checkPassword(pw):
fi = formState.fieldInstances['password']
fi.setError('invalid_pw', self.formErrors)
formState.severity = max(formState.severity, fi.severity)
return True
pwConfirm = form.get('passwordConfirm')
if pw != pwConfirm:
fi = formState.fieldInstances['password']
fi.setError('confirm_nomatch', self.formErrors)
formState.severity = max(formState.severity, fi.severity)
return True
oldPw = form.get('oldPassword')
regMan = IMemberRegistrationManager(self.loopsRoot)
principal = self.request.principal
result = regMan.changePassword(principal, oldPw, pw)
if not result:
fi = formState.fieldInstances['oldPassword']
fi.setError('wrong_oldpw', self.formErrors)
formState.severity = max(formState.severity, fi.severity)
return True
url = '%s?error_message=%s' % (self.url, self.message)
self.request.response.redirect(url)
return False
def validate(self, data):
formState = FormState()
for f in self.schema.fields:
fi = f.getFieldInstance()
value = data.get(f.name)
fi.validate(value, data)
formState.fieldInstances.append(fi)
formState.severity = max(formState.severity, fi.severity)
return formState
class PasswordReset(PasswordChange):
interface = IPasswordReset
message = _(u'message_password_reset_successfully')
reset_mail_message = _(u'message_password_reset_mail')
formErrors = dict(
invalid_pw=FormError(_(u'error_password_invalid_pw')),
invalid_token=FormError(_(u'error_reset_token_invalid')),
invalid_username=FormError(_(u'error_username_invalid')),
)
label = label_submit = _(u'label_reset_password')
password_reset_period = 15
ignoreRedirect = False
@Lazy
def macro(self):
return template.macros['reset_form']
@Lazy
def item(self):
return self
@Lazy
def data(self):
return dict(password=u'')
@Lazy
def token(self):
return self.request.form.get('token')
@Lazy
def fields(self):
result = super(PasswordReset, self).fields
if self.token and validateToken(self.token):
result = [r for r in result if r.name == 'password']
else:
result = [r for r in result if r.name == 'username']
return result
def getSubject(self, lang=None, domain=None):
if not lang:
lang = self.languageInfo.language
if not domain:
try:
domain = config.baseDomain
except:
domain = self.request.getHeader('HTTP_HOST')
result = translate(_(u'pw_reset_mail_subject_$domain',
mapping=dict(domain=domain)),
target_language=lang)
return result
def getMessage(self, token, lang=None):
if not lang:
lang = self.languageInfo.language
reset_url = '%s?token=%s' % (self.request.getURL(), token)
message = translate(_(u'pw_reset_mail_text_$link',
mapping=dict(link=reset_url)),
target_language=lang)
return message
def sendPasswordResetMail(self, sender, recipients=[], subject='',
message=''):
msg = MIMEText(message.encode('UTF-8'), 'html', 'UTF-8')
msg['Subject'] = subject.encode('UTF-8')
msg['From'] = sender
msg['To'] = ', '.join(recipients)
mailhost = component.getUtility(IMailDelivery, 'Mail')
mailhost.send(sender, recipients, msg.as_string())
def validateToken(self, token, secret=None):
return validateToken(token, secret)
def update(self):
form = self.request.form
if not form.get('action'):
return True
principal = self.request.principal
if principal and principal.id != 'zope.anybody':
return True
formState = self.formState = self.validate(form)
if formState.severity > 0:
return True
token = form.get('token')
secret = JWT_SECRET
if token:
if not validateToken(token):
fi = formState.fieldInstances['password']
fi.setError('invalid_token', self.formErrors)
formState.severity = max(formState.severity, fi.severity)
return True
header, claims = jwt.verify_jwt(token, secret, ['PS256'])
username = claims.get('username')
principal = getPrincipalForUsername(username, self.context,
self.request)
if not principal:
fi = formState.fieldInstances['password']
fi.setError('invalid_username', self.formErrors)
formState.severity = max(formState.severity, fi.severity)
return True
pw = form.get('password')
if not checkPassword(pw):
fi = formState.fieldInstances['password']
fi.setError('invalid_pw', self.formErrors)
formState.severity = max(formState.severity, fi.severity)
return True
principal.setPassword(pw)
else:
username = form.get('username')
principal = getPrincipalForUsername(username, self.context,
self.request)
person = getPersonForUser(self.context, self.request, principal)
if not person:
fi = formState.fieldInstances['username']
fi.setError('invalid_username', self.formErrors)
formState.severity = max(formState.severity, fi.severity)
return True
person = adapted(person)
payload = dict(username=username)
token = jwt.generate_jwt(payload, secret, 'PS256',
timedelta(minutes=getattr(
self, 'password_reset_period', 15)))
addr = self.getMailAddress(person)
if addr:
recipients = [addr]
else:
fi = formState.fieldInstances['username']
fi.setError('invalid_username', self.formErrors)
formState.severity = max(formState.severity, fi.severity)
return True
lang = self.languageInfo.language
try:
domain = config.baseDomain
except:
domain = self.request.getHeader('HTTP_HOST')
senderInfo = self.globalOptions('email.sender')
sender = senderInfo and senderInfo[0] or 'info@loops.cy55.de'
sender = sender.encode('UTF-8')
self.sendPasswordResetMail(sender, recipients, self.getSubject(),
self.getMessage(token))
url = '%s?error_message=%s' % (self.url, self.reset_mail_message)
if not self.ignoreRedirect:
self.request.response.redirect(url)
return False
url = '%s?error_message=%s' % (self.url, self.message)
self.request.response.redirect(url)
return False
def getMailAddress(self, person):
return person.email
def validate(self, data):
formState = FormState()
for f in self.schema.fields:
fi = f.getFieldInstance()
value = data.get(f.name)
fi.validate(value, data)
formState.fieldInstances.append(fi)
formState.severity = max(formState.severity, fi.severity)
return formState

81
cco/member/configure.zcml Normal file
View file

@ -0,0 +1,81 @@
<configure
xmlns:zope="http://namespaces.zope.org/zope"
xmlns:browser="http://namespaces.zope.org/browser"
xmlns:i18n="http://namespaces.zope.org/i18n"
i18n_domain="cco.member">
<i18n:registerTranslations directory="locales" />
<!-- authentication -->
<zope:adapter
name="auth"
for="* zope.publisher.interfaces.IRequest"
provides="zope.traversing.interfaces.ITraversable"
factory="cco.member.auth.AuthURLNameSpace" />
<zope:utility
name="cco.member Session Credentials"
provides="zope.app.authentication.interfaces.ICredentialsPlugin"
factory="cco.member.auth.SessionCredentialsPlugin" />
<!--<zope:class class="cco.member.auth.SessionCredentialsPlugin">
<require
permission="zope.ManageServices"
interface="zope.app.authentication.session.IBrowserFormChallenger"
set_schema="zope.app.authentication.session.IBrowserFormChallenger" />
</zope:class>-->
<!-- views -->
<browser:page for="loops.interfaces.INode"
name="login.html"
class="cco.member.browser.LoginForm"
permission="zope.View" />
<browser:page for="loops.interfaces.INode"
name="2fa_tan_form.html"
class="cco.member.browser.TanForm"
permission="zope.View" />
<!-- <browser:page for="loops.interfaces.INode"
name="logout.html"
class="cco.member.browser.Logout"
permission="zope.View" />-->
<browser:page for="loops.interfaces.INode"
name="logout_view"
class="cco.member.browser.LogoutView"
permission="zope.View" />
<zope:adapter
name="login.html"
for="loops.interfaces.IConcept
zope.publisher.interfaces.browser.IBrowserRequest"
provides="zope.interface.Interface"
factory="cco.member.browser.LoginConcept"
permission="zope.View" />
<browser:page
for="loops.interfaces.INode"
name="cco_change_password.html"
class="cco.member.browser.PasswordChange"
permission="zope.View" />
<browser:page
for="loops.interfaces.INode"
name="cco_reset_password.html"
class="cco.member.browser.PasswordReset"
permission="zope.View" />
<!-- webapi -->
<zope:adapter
name="member_api_users"
for="loops.interfaces.ITypeConcept
zope.publisher.interfaces.http.IHTTPRequest"
provides="zope.interface.Interface"
factory="cco.member.webapi.Users"
permission="cco.webapi.Post" />
</configure>

61
cco/member/interfaces.py Normal file
View file

@ -0,0 +1,61 @@
#
# Copyright (c) 2016 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Interfaces for member registration, password change, etc.
"""
from zope.i18nmessageid import MessageFactory
from zope.interface import Interface
from zope import schema
_ = MessageFactory('cco.member')
class IPasswordChange(Interface):
oldPassword = schema.Password(title=_(u'label_old_password'),
description=_(u'desc_old_password'),
required=True,)
password = schema.Password(title=_(u'label_new_password'),
description=_(u'desc_new_password'),
required=True,)
passwordConfirm = schema.Password(title=_(u'label_confirm_new_password'),
description=_(u'desc_confirm_new_password'),
required=True,)
oldPassword.nostore = True
password.nostore = True
passwordConfirm.nostore = True
class IPasswordReset(Interface):
username = schema.TextLine(title=_(u'label_username'),
description=_(u'desc_usernam'),
required=False,)
password = schema.Password(title=_(u'label_new_password'),
description=_(u'desc_new_password'),
required=False,)
username.nostore = True
password.nostore = True

View file

@ -0,0 +1,16 @@
msgid ""
msgstr ""
"Project-Id-Version: 1.0\n"
"POT-Creation-Date: 2015-11-23 12:00 CET\n"
"PO-Revision-Date: 2015-11-23 12:00 CET\n"
"Last-Translator: Helmut Merz <helmutm@cy55.de>\n"
"Language-Team: cco.member developers <helmutm@cy55.de>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: kwrite\n"
msgid "Login"
msgstr ""

Binary file not shown.

View file

@ -0,0 +1,121 @@
msgid ""
msgstr ""
"Project-Id-Version: 1.0\n"
"POT-Creation-Date: 2015-11-23 12:00 CET\n"
"PO-Revision-Date: 2020-03-24 12:00+0100\n"
"Last-Translator: Helmut Merz <helmutm@cy55.de>\n"
"Language-Team: STEG developers <helmutm@cy55.de>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: sublime_text\n"
# authentication
msgid "title_login"
msgstr "Anmeldung"
msgid "description_login"
msgstr "Bitte geben Sie ihre Anmeldedaten ein"
msgid "label_login"
msgstr "Benutzername"
msgid "label_password"
msgstr "Passwort"
msgid "button_login"
msgstr "Anmelden"
msgid "button_login_2factor"
msgstr "Daten eingeben"
msgid "title_login_tan"
msgstr "Anmeldung: TAN-Eingabe"
msgid "message_enter_tan_digits_$email_$a_$b"
msgstr "Ein E-Mail mit einer Transaktionsnummer (TAN) wurde an die Adresse $email gesendet. Bitte geben Sie die $a. und die $b. Ziffer dieser TAN in die untenstehenden Felder ein."
msgid "tan_mail_subject"
msgstr "Ihre TAN für den Zugang"
msgid "tan_mail_text_$tan"
msgstr "TAN: $tan"
msgid "label_tan_$a"
msgstr "TAN Ziffer $a"
msgid "label_tan_$b"
msgstr "TAN Ziffer $b"
msgid "button_login_tan"
msgstr "Anmelden"
msgid "Invalid login data!"
msgstr "Ungültige Anmeldedaten!"
# change password
msgid "label_change_password"
msgstr "Passwort ändern"
msgid "label_old_password"
msgstr "Altes Passwort"
msgid "desc_old_password"
msgstr "Bitte geben Sie Ihr bisheriges Passwort ein."
msgid "label_new_password"
msgstr "Neues Passwort"
msgid "desc_new_password"
msgstr "Bitte geben Sie das gewünschte neue Passwort ein."
msgid "label_confirm_new_password"
msgstr "Neues Passwort wiederholen"
msgid "desc_confirm_new_password"
msgstr "Bitte wiederholen Sie das neue Passwort."
msgid "message_password_changed"
msgstr "Ihr Passwort wurde geändert. Bitte melden Sie sich mit Ihrem neuen Passwort an."
msgid "error_password_wrong_oldpw"
msgstr "Das eingegebene alte Passwort ist nicht korrekt."
msgid "error_password_confirm_nomatch"
msgstr "Passwort und Passwort-Wiederholung stimmen nicht überein."
msgid "error_password_invalid_pw"
msgstr "Das eingegebene Passwort entspricht nicht den Anforderungen an ein sicheres Passwort: mindestens acht Zeichen, Groß- und Kleinbuchstaben, mindestens eine Ziffer oder ein Sonderzeichen."
# password reset
msgid "pw_reset_mail_text_$link"
msgstr ""
"Bitte nutzen Sie folgenden Link für um Ihr Passwort zurückzusetzen:<br />"
"<a href=\"$link\">Passwort zurücksetzen</a>"
msgid "pw_reset_mail_subject_$domain"
msgstr "Ihre Passwort zurücksetzen Anforderung auf $domain"
msgid "message_password_reset_successfully"
msgstr "Ihr Passwort wurde erfolgreich zurückgesetzt. Sie können sich ab sofort mit Ihrem neuen Passwort einloggen."
msgid "message_password_reset_mail"
msgstr "Sie erhalten zeitnah eine E-Mail mit dem Passwort Reset Link."
msgid "label_reset_password"
msgstr "Passwort (zurück)setzen"
msgid "error_username_invalid"
msgstr "Ungültiger Benutzername/ E-Mail"
msgid "error_reset_token_invalid"
msgstr "Reset Token ungültig/ abgelaufen"
msgid "label_username"
msgstr "Benutzername/ E-Mail"
msgid "label_forgot_password"
msgstr "Passwort vergessen?"

37
cco/member/pwpolicy.py Normal file
View file

@ -0,0 +1,37 @@
#
# Copyright (c) 2016 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Check passwords for conformance to password policy.
"""
def checkPassword(pw):
if len(pw) < 8:
return False
safety = dict(upper=False, lower=False, nonalpha=False)
for c in pw:
if ord(c) > 128:
return False
if c.isupper():
safety['upper'] = True
elif c.islower():
safety['lower'] = True
else:
safety['nonalpha'] = True
return False not in safety.values()

11
cco/member/testing/user_post.sh Executable file
View file

@ -0,0 +1,11 @@
#user_post.sh
url="http://localhost:11080/sites/steg/views/webapi/users" # local dev
#url="http://fms.steg.cy55.de/api/users" # STEG internal
login="api"
password="dummy"
data='{"name": "steg.test77"}'
header_ct="Content-Type: application/json"
curl -v -u $login:$password --data-raw "$data" -H "$header_ct" $url

44
cco/member/tests.py Normal file
View file

@ -0,0 +1,44 @@
# cco.member.tests
""" Tests for the 'cco.member' package.
"""
import os
import unittest, doctest
from zope import component
from zope.app.testing.setup import placefulSetUp, placefulTearDown
from zope.publisher.browser import TestRequest
from loops.interfaces import IConceptManager
from loops.tests.setup import TestSite
def setUp(self):
site = placefulSetUp(True)
t = TestSite(site)
concepts, resources, views = t.setup()
loopsRoot = site['loops']
self.globs['loopsRoot'] = loopsRoot
def tearDown(self):
placefulTearDown()
class Test(unittest.TestCase):
"Basic tests."
def testBasicStuff(self):
pass
def test_suite():
flags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS
return unittest.TestSuite((
unittest.makeSuite(Test),
doctest.DocFileSuite('README.txt', optionflags=flags,
setUp=setUp, tearDown=tearDown),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')

14
cco/member/webapi.py Normal file
View file

@ -0,0 +1,14 @@
#
# cco.member.webapi
#
from cco.webapi.server import TypeHandler
class Users(TypeHandler):
def create(self):
data = self.getInputData()
print '***', data
#create_or_update_object(self.loopsRoot, 'person', data)
return self.success()

23
pyproject.toml Normal file
View file

@ -0,0 +1,23 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "loops-ext"
version = "3.0.1"
description = "extensions for the loops web application platform"
readme = "README.md"
license = {text = "MIT"}
keywords = ["loops"]
authors = [{name = "Helmut Merz", email = "helmutm@cy55.de"}]
dependencies = [
"loops"
]
[project.optional-dependencies]
test = ["zope.testrunner"]
[tool.setuptools]
packages = ["cco"]

5
runtests.sh Executable file
View file

@ -0,0 +1,5 @@
# runtests.sh
# run all unit / doc tests
zope-testrunner --test-path=. $*