Source code for khoros.auth
# -*- coding: utf-8 -*-
"""
:Module: khoros.auth
:Synopsis: This module handles authentication-related tasks and operations for the Khoros Community APIs
:Usage: ``import khoros.auth``
:Example: ``session_key = khoros.auth(KhorosObject)``
:Created By: Jeff Shurtliff
:Last Modified: Jeff Shurtliff
:Modified Date: 14 Jun 2023
"""
import requests
from defusedxml import ElementTree
from . import api, errors
from .utils import core_utils, log_utils
# Initialize the logger for this module
logger = log_utils.initialize_logging(__name__)
[docs]
def get_session_key(khoros_object, username=None, password=None):
"""This function retrieves the session key for an authentication session.
.. versionchanged:: 5.3.0
Added logging error messages when exceptions are raised.
.. versionchanged:: 5.0.0
The error handling has been improved to provide more useful information when applicable.
.. versionchanged:: 4.2.0
The URI is now generated utilizing the :py:func:`khoros.auth._get_khoros_login_url` function.
.. versionchanged:: 3.5.0
This function can now be used to authenticate secondary users with either a username and password or with
only a username if calling the function with a previously authenticated user with Administrator privileges.
.. versionchanged:: 3.4.0
Support has been introduced for the ``ssl_verify`` core setting in the :py:class:`khoros.core.Khoros` object.
.. versionchanged:: 3.3.0
Updated ``khoros_object._settings`` to be ``khoros_object.core_settings``.
.. versionchanged:: 2.7.4
The HTTP headers were changed to be all lowercase in order to be standardized across the library.
:param khoros_object: The core Khoros object
:type khoros_object: class[khoros.Khoros]
:param username: The username (i.e. login) of a secondary user to authenticate *(optional)*
:type username: str, None
:param password: The password of a secondary user to authentication *(optional)*
:returns: The session key in string format
:raises: :py:exc:`khoros.errors.exceptions.SessionAuthenticationError`
"""
# Prepare the API call
password = khoros_object.core_settings['session_auth']['password'] if not username else password
username = khoros_object.core_settings['session_auth']['username'] if not username else username
payload = _get_session_key_payload(username, password)
query_string = core_utils.encode_query_string(payload)
# Determine if TLS certificates should be verified during API calls
verify = api.should_verify_tls(khoros_object)
# Perform the API call to authorize the session
uri = f'{_get_khoros_login_url(khoros_object)}/?{query_string}'
secondary_user = False if password else True
header = _get_session_key_header(khoros_object, secondary_user)
response = requests.post(uri, headers=header, verify=verify)
if response.status_code != 200:
if type(response.text) == str and response.text.startswith('<html>'):
api_error = errors.handlers.get_error_from_html(response.text)
error_msg = f"The authentication attempt failed with the following error:\n\t{api_error}"
logger.error(error_msg)
raise errors.exceptions.SessionAuthenticationError(error_msg)
raise errors.exceptions.SessionAuthenticationError()
else:
response = response.json()
if isinstance(response, dict) and 'response' in response and response['response'].get('status') == 'error':
try:
error_msg = response['response']['error']['message']
full_error_msg = f"Session key authentication failed for '{username}': {error_msg}"
logger.error(full_error_msg)
raise errors.exceptions.SessionAuthenticationError(full_error_msg)
except KeyError:
error_msg = f"Failed to retrieve a session key for '{username}'"
logger.error(error_msg)
raise errors.exceptions.SessionAuthenticationError(error_msg)
else:
try:
session_key = response['response']['value']['$']
except KeyError:
error_msg = f"Failed to retrieve a session key for '{username}'"
logger.error(error_msg)
raise errors.exceptions.SessionAuthenticationError(error_msg)
return session_key
[docs]
def get_sso_key(khoros_object):
"""This function retrieves the session key for a LithiumSSO session.
.. versionchanged:: 5.3.0
Added logging error messages when exceptions are raised.
.. versionchanged:: 5.0.0
The two ``if`` statements have been merged.
.. versionadded:: 4.2.0
:param khoros_object: The core Khoros object
:type khoros_object: class[khoros.Khoros]
:returns: The session key in string format
:raises: :py:exc:`khoros.errors.exceptions.SsoAuthenticationError`
"""
khoros_login_url = _get_khoros_login_url(khoros_object)
headers = _get_session_key_header(khoros_object)
response = requests.post(
khoros_login_url,
headers=headers,
data=khoros_object.core_settings.get('sso')
)
tree = ElementTree.fromstring(response.text)
if 'status' in tree.attrib and tree.attrib['status'] == 'success':
return tree.findtext('value')
error_msg = 'Failed to retrieve a session key with the LithiumSSO token.'
logger.error(error_msg)
raise errors.exceptions.SsoAuthenticationError(error_msg)
def _get_khoros_login_url(khoros_object):
"""This function returns the URL for the Khoros login endpoint.
.. versionadded:: 4.2.0
:param khoros_object: The core Khoros object
:type khoros_object: class[khoros.Khoros]
:returns: The URL string
"""
community_url = khoros_object.core_settings.get('community_url')
return f'{community_url}/restapi/vc/authentication/sessions/login'
def _get_session_key_payload(_username, _password=None, _return_json=True):
"""This function constructs the payload used to request a session key.
.. versionadded:: 3.5.0
:param _username: The username (i.e. login) for the user being authenticated
:type _username: str
:param _password: The password for the user being authenticated
.. note:: A password is not required if authenticating a secondary user with a previously
authenticated Administrator account.
:type _password: str, None
:param _return_json: Determines if the session key should be returned in JSON format (``True`` by default)
:type _return_json: bool
:returns: A dictionary with the authentication request payload
"""
_auth_payload = {'user.login': _username}
if _password:
_auth_payload.update({'user.password': _password})
if _return_json:
_auth_payload.update({'restapi.response_format': 'json'})
return _auth_payload
def _get_session_key_header(_khoros_object, _secondary=False):
"""This function retrieves the header for an API call to retrieve a session key.
.. versionchanged:: 5.3.0
Added logging error messages when exceptions are raised.
.. versionadded:: 3.5.0
:param _khoros_object: The core Khoros object
:type _khoros_object: class[khoros.Khoros]
:param _secondary: Indicates that the authentication request is for a secondary user (``False`` by default)
:returns: A dictionary with the header information
:raises: :py:exc:`khoros.errors.exceptions.SessionAuthenticationError`
"""
_header = {"content-type": "application/x-www-form-urlencoded"}
if _secondary:
if _khoros_object.auth.get('header'):
_header.update(_khoros_object.auth.get('header'))
else:
_error_msg = "Unable to retrieve a session key for secondary users without an existing session key"
logger.error(_error_msg)
raise errors.exceptions.SessionAuthenticationError(_error_msg)
return _header
[docs]
def invalidate_session(khoros_object, user_id=None, sso_id=None):
"""This function invalidates an active authentication session.
.. versionchanged:: 3.3.0
Updated ``khoros_object._settings`` to be ``khoros_object.core_settings``.
:param khoros_object: The core Khoros object
:type khoros_object: class[khoros.Khoros]
:param user_id: The User ID of the service account (Lithium Registration)
:type user_id: str, int, None
:param sso_id: The SSO ID of the service account (Single Sign-On)
:type sso_id: str, int, None
:returns: Boolean value defining if the session was invalidated successfully
:raises: :py:exc:`ValueError`, :py:exc:`khoros.errors.exceptions.APIConnectionError`,
:py:exc:`khoros.errors.exceptions.POSTRequestError`,
:py:exc:`khoros.errors.exceptions.PayloadMismatchError`
"""
session_terminated = False
payload = {}
if sso_id:
payload['sso_id'] = sso_id
elif user_id:
payload['id'] = user_id
else:
username = khoros_object.core_settings['session_auth']['username']
user_id = khoros_object.search('id', 'users', f'login = "{username}"')
user_id = user_id['data']['items'][0]['id']
payload['id'] = user_id
query_url = f"{khoros_object.core_settings['v2_base']}/auth/signout"
headers = {'content-type': 'application/json'}
response = api.post_request_with_retries(query_url, payload, return_json=True,
auth_dict=khoros_object.auth, headers=headers)
if response['status'] == "success":
if response['data']['signed_off_all_sessions'] is True:
session_terminated = True
print("The authentication session has been terminated.\n")
return session_terminated
[docs]
def get_oauth_authorization_url(khoros_object):
"""This function constructs the authorization URL needed for the OAuth 2.0 Web Application Flow.
.. versionchanged:: 3.3.0
Updated ``khoros_object._settings`` to be ``khoros_object.core_settings``.
:param khoros_object: The core Khoros object
:type khoros_object: class[khoros.Khoros]
:returns: The properly encoded authorization URL in string format
"""
community_url = khoros_object.core_settings['community_url']
query_string = core_utils.encode_query_string({
'client_id': khoros_object.core_settings['oauth2']['client_id'],
'redirect_uri': khoros_object.core_settings['oauth2']['redirect_url'],
'response_type': 'code',
'state': core_utils.get_random_string()
}, no_encode='redirect_uri')
authorization_url = f"{community_url}/auth/oauth2/authorize?{query_string}"
return authorization_url
[docs]
def get_oauth_callback_url_from_user(khoros_object):
"""This function instructs the end-user to visit the authorization URL and provide the full callback URL.
:param khoros_object: The core Khoros object
:type khoros_object: class[khoros.Khoros]
:returns: The callback URL supplied by the end-user
"""
authorization_url = get_oauth_authorization_url(khoros_object)
print("Please visit this URL to authorize access: ", authorization_url)
callback_url = input("Enter the full URL in the address bar after visiting the URL above: ")
if not callback_url.startswith('http'):
raise errors.exceptions.InvalidCallbackURLError(val=callback_url)
return callback_url