Source code for dokomoforms.handlers.auth

"""Authentication handlers."""

from collections import OrderedDict
from datetime import datetime, timedelta
import urllib.parse
import uuid

from sqlalchemy.orm.exc import NoResultFound

from tornado.escape import json_decode
import tornado.concurrent
import tornado.web
import tornado.gen
import tornado.httpclient

from passlib.hash import bcrypt_sha256

from dokomoforms.options import options
from dokomoforms.handlers.util import BaseHandler, authenticated_admin
from dokomoforms.models import User, Administrator, Email


[docs]class Login(BaseHandler): """POST here to verify the assertion generated by Mozilla Persona.""" def _async_post(self, http_client, url, input_data) -> tornado.concurrent.Future: """Asynchronously POSTs input_data to the url using http_client.fetch. :param http_client: the HTTP client :param url: the URL for POSTing :param input_data: the data to POST :return: a tornado.concurrent.Future that will contain the response """ return tornado.gen.Task( http_client.fetch, url, method='POST', body=urllib.parse.urlencode(input_data), ) @tornado.web.asynchronous @tornado.gen.engine
[docs] def post(self): """POST to Mozilla's verifier service. Accepts: { "assertion": <assertion> } Then, POSTS to https://verifier.login.persona.org/verify to verify that the assertion is valid. If so, attempts to log the user in by e-mail. Responds with: 200 OK { "email": <e-mail address> } :raise tornado.web.HTTPError: 400 Bad Request if the assertion is not verified 422 Unprocessable Entity if the e-mail address is not associated with a user account. """ assertion = self.get_argument('assertion') http_client = tornado.httpclient.AsyncHTTPClient() url = options.persona_verification_url input_data = {'assertion': assertion, 'audience': self.request.host} response = yield self._async_post(http_client, url, input_data) data = json_decode(response.body) if data['status'] != 'okay': raise tornado.web.HTTPError(400, 'Failed assertion test') try: user = ( self.session .query(User.id, User.name) .join(Email) .filter(Email.address == data['email']) .one() ) except NoResultFound: if data['email'] != options.admin_email: _ = self.locale.translate raise tornado.web.HTTPError( 422, reason=_( 'There is no account associated with the e-mail' ' address {}'.format(data['email']) ), ) with self.session.begin(): user = Administrator( name=options.admin_name, emails=[Email(address=options.admin_email)] ) self.session.add(user) cookie_options = { 'httponly': True, } if options.https: cookie_options['secure'] = True self.set_secure_cookie('user', user.id, **cookie_options) self.write({'email': data['email']}) self.finish()
[docs]class Logout(BaseHandler): """POST here to log out."""
[docs] def post(self): """Delete the "user" cookie. Note that this can't be done in JavaScript because the user cookie is httponly. """ self.clear_cookie('user')
[docs]class GenerateToken(BaseHandler): # We should probably do this in JS """GET your token here. GETting twice resets the token.""" @authenticated_admin
[docs] def get(self): """Set a new token for the logged in user and return the token.""" token = uuid.uuid4().hex user = self.current_user_model with self.session.begin(): user.token = bcrypt_sha256.encrypt(token).encode() user.token_expiration = datetime.now() + timedelta(days=60) self.write(OrderedDict(( ('token', token), ('expires_on', user.token_expiration.isoformat()), )))
[docs]class CheckLoginStatus(BaseHandler): """An endpoint for the application to check login status.""" @tornado.web.authenticated
[docs] def post(self): """2xx good, 5xx bad.""" self.finish()