# -*- coding: utf-8 -*-
"""
:Module: khoros.utils.core_utils
:Synopsis: Collection of supporting utilities and functions to complement the primary modules
:Usage: ``from khoros.utils import core_utils``
:Example: ``encoded_string = core_utils.encode_url(decoded_string)``
:Created By: Jeff Shurtliff
:Last Modified: Jeff Shurtliff
:Modified Date: 23 May 2022
"""
import os
import base64
import random
import string
import warnings
import subprocess
import urllib.parse
from html import unescape
from .. import errors
from . import log_utils
# Initialize the logger for this module
logger = log_utils.initialize_logging(__name__)
[docs]
def url_encode(raw_string):
"""This function encodes a string for use in URLs.
:param raw_string: The raw string to be encoded
:type raw_string: str
:returns: The encoded string
"""
return urllib.parse.quote_plus(raw_string)
[docs]
def url_decode(encoded_string):
"""This function decodes a url-encoded string.
:param encoded_string: The url-encoded string
:type encoded_string: str
:returns: The unencoded string
"""
return urllib.parse.unquote_plus(encoded_string)
[docs]
def decode_html_entities(html_string):
"""This function converts HTML entities (e.g. ``&``, ``'``, etc.) back to their original characters.
:param html_string: The string containing HTML entities to be decoded
:type html_string: str
:returns: The string with decoded HTML entities
"""
return unescape(html_string)
[docs]
def decode_binary(binary):
"""This function decodes a binary into a UTF-8 encoded string.
.. versionadded:: 2.6.0
:param binary: The binary to be decoded
:returns: The properly decoded string
:raises: :py:exc:`TypeError`, :py:exc:`ValueError`
"""
return binary.decode('utf-8')
[docs]
def encode_base64(object_to_encode, str_encoding='utf-8', url_encode_object=False, return_bytes=False):
"""This function encodes a string or bytes-like object
.. versionadded:: 3.0.0
:param object_to_encode: The string or bytes-like object to encode as base64
:param str_encoding: Defines the encoding (``utf-8`` by default) to utilize
:type str_encoding: str
:param url_encode_object: Determines if the base64 string should be url-encoded (``False`` by default)
:type url_encode_object: bool
:param return_bytes: Determines if the base64-encoded object should be returned as a bytes-like object rather
than a string (``False`` by default)
:returns: The encoded object as a string or bytes-like object
:raises: :py:exc:`TypeError`
"""
if isinstance(object_to_encode, str):
object_to_encode = object_to_encode.encode(str_encoding)
base64_object = base64.b64encode(object_to_encode)
if not return_bytes:
base64_object = base64_object.decode(str_encoding)
if url_encode_object:
base64_object = url_encode(base64_object)
return base64_object
[docs]
def run_cmd(cmd, return_type='dict', shell=False, decode_output=True, strip_output=False,
exclude_stdout=False, exclude_stderr=False, exclude_return_code=False):
"""This function executes a shell command on the operating system.
.. versionchanged:: 3.5.0
The default value of the ``shell`` parameter has been changed to ``False`` to avoid unnecessary
`security <https://bandit.readthedocs.io/en/latest/plugins/b602_subprocess_popen_with_shell_equals_true.html>`_
risk and added a logged warning if the value is manually set to ``True``.
.. versionadded:: 2.5.1
:param cmd: The command to be executed
:type cmd: str
:param return_type: Determines the format in which the results should be returned (``dict`` by default)
:type return_type: str
:param shell: Determines if the ``shell`` argument in the :py:func:`subprocess.run` function should be ``True``
:type shell: bool
:param decode_output: Determines if the binary output should be decoded as a UTF-8 string (``True`` by default)
:type decode_output: bool
:param strip_output: Determines if the escape character(s) should be stripped from the output (``False`` by default)
:type strip_output: bool
:param exclude_stdout: Determines if the ``stdout`` output should be excluded (``False`` by default)
:type exclude_stdout: bool
:param exclude_stderr: Determines if the ``stderr`` output should be excluded (``False`` by default)
:type exclude_stderr: bool
:param exclude_return_code: Determines if the return code from the command should be excluded (``False`` by default)
:type exclude_return_code: bool
:returns: The results from the executed script
:raises: :py:exc:`khoros.errors.exceptions.MissingRequiredDataError`
"""
if exclude_stdout and exclude_stderr and exclude_return_code:
raise errors.exceptions.MissingRequiredDataError("At least one output type must be enabled.")
if shell:
warn_msg = "It is recommended that the shell parameter be set to False to avoid introducing risk of " \
"shell injection attacks in your code."
logger.warning(warn_msg)
output = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell)
stdout, stderr, return_code = output.stdout, output.stderr, output.returncode
results = {
'stdout': stdout,
'stderr': stderr,
'return_code': return_code
}
for stream in ('stdout', 'stderr'):
if decode_output:
results[stream] = decode_binary(results.get(stream))
if strip_output:
results[stream] = results.get(stream).strip()
output_types = {'stdout': exclude_stdout, 'stderr': exclude_stderr, 'return_code': exclude_return_code}
for output_type, excluded in output_types.items():
if excluded:
del results[output_type]
if return_type == 'list':
results = list(results.values())
elif return_type == 'tuple':
results = tuple(results.values())
else:
if return_type != 'dict':
raise ValueError(f"'{return_type}' is not a valid return type.")
return results
def _is_zero_length(_element):
"""This function checks to see if an element has a zero length.
:param _element: The element of which the length will be checked
:returns: Boolean value stating whether or not the length of the element is zero
"""
return True if len(_element) == 0 else False
def _structure_query_string(_url_dict, _no_encode):
"""This function constructs a query string where one or more fields must not be URL-encoded.
:param _url_dict: Dictionary of URL query string keys and values
:type _url_dict: dict
:param _no_encode: Designates any dictionary keys (i.e. field names) whose values should not be URL-encoded
:type _no_encode: list, tuple, set, str, None
:returns: The URL query string in string format
"""
if type(_no_encode) == str:
_no_encode = (_no_encode, )
_delimiters = {True: "", False: "&"}
_query_string = ""
for _field_name, _field_value in _url_dict.items():
if _field_name not in _no_encode:
_field_value = url_encode(_field_value)
_delimiter = _delimiters.get(_is_zero_length(_query_string))
_query_string = f"{_query_string}{_delimiter}{_field_name}={_field_value}"
return _query_string
[docs]
def encode_query_string(url_dict, no_encode=None, json_payload=False):
"""This function compiles a URL query string from a dictionary of parameters.
.. versionchanged:: 3.2.0
Introduced the ability to pass the query parameters as JSON payload to avoid URI length limits.
:param url_dict: Dictionary of URL query string keys and values
:type url_dict: dict
:param no_encode: Designates any dictionary keys (i.e. field names) whose values should not be URL-encoded
:type no_encode: list, tuple, set, str, None
:param json_payload: Determines if query parameters should be passed as JSON payload rather than in the URI
(``False`` by default)
:type json_payload: bool
:returns: The URL query string in string format
"""
if json_payload:
# Structure the query string using only the field names
query_string = ""
for field in url_dict.keys():
delimiter = "&" if len(query_string) > 0 else ""
query_string = f"{query_string}{delimiter}{field}"
elif no_encode:
query_string = _structure_query_string(url_dict, no_encode)
else:
query_string = urllib.parse.urlencode(url_dict)
return query_string
[docs]
def is_numeric(value):
"""This function checks whether or not a value is numeric either as an integer or a numeric string.
.. versionadded:: 2.3.0
:param value: The value to be examined
:type value: str, int
:returns: Boolean value indicating if the examined value is numeric
"""
return True if type(value) == int or (type(value) == str and value.isnumeric()) else False
[docs]
def convert_set(iterable, convert_to='list'):
"""This function casts a ``set`` variable to be a ``list`` instead so that it can be scriptable.
:param iterable: The iterable to be evaluated to see if it has a ``set`` type
:param convert_to: Defines if the iterable should be cast to a ``list`` (default) or a ``tuple``
:type convert_to: str
:returns: The converted variable as a ``list`` or ``tuple`` (or untouched if not a ``set``)
"""
if type(iterable) == set:
if convert_to == 'tuple':
iterable = tuple(iterable)
else:
iterable = list(iterable)
return iterable
[docs]
def convert_single_value_to_tuple(value):
"""This function converts a single value of nearly any type into a tuple.
.. versionchanged:: 3.2.0
The function has been aesthetically updated to be more PEP8 compliant.
.. versionadded:: 2.3.0
:param value: The value to convert into a tuple
"""
value = (value,)
return value
[docs]
def convert_string_to_tuple(value, delimiter=''):
"""THis function converts a value to a tuple if in string format.
.. versionchanged:: 3.5.0
The typecheck has been updated to use ``isinstance`` and the function can now split delimited strings as needed.
.. versionadded:: 2.3.0
:param value: The potential string to convert
:type value: str
:param delimiter: The value (e.g. ``,``) used to separate values in a delimited string (empty by default)
:returns: The tuple (if original value was in string format) or the original value/type
"""
if isinstance(value, str):
if delimiter and delimiter in value:
value = value.split(delimiter)
else:
value = convert_single_value_to_tuple(value)
return value
[docs]
def is_iterable(var):
"""This function identifies if a given variable is an iterable.
.. versionadded:: 3.5.0
:param var: The variable to check
:returns: A boolean value indicating whether or not the variable is an iterable
"""
is_iter = any((isinstance(var, list), isinstance(var, tuple), isinstance(var, set),
isinstance(var, type({}.keys())), isinstance(var, type({}.values()))))
return is_iter
[docs]
def get_random_string(length=32, prefix_string=""):
"""This function returns a random alphanumeric string to use as a salt or password.
:param length: The length of the string (``32`` by default)
:type length: int
:param prefix_string: A string to which the salt should be appended (optional)
:type prefix_string: str
:returns: The alphanumeric string
"""
return f"{prefix_string}{''.join([random.choice(string.ascii_letters + string.digits) for _ in range(length)])}"
[docs]
def display_warning(warn_msg):
"""This function displays a :py:exc:`UserWarning` message via the :py:mod:`warnings` module.
.. versionchanged:: 5.0.0
Removed the redundant return statement.
.. versionadded:: 2.1.0
:param warn_msg: The message to be displayed
:type warn_msg: str
:returns: None
"""
warnings.warn(warn_msg, UserWarning)
[docs]
def get_file_type(file_path):
"""This function attempts to identify if a given file path is for a YAML or JSON file.
.. versionadded:: 2.2.0
:param file_path: The full path to the file
:type file_path: str
:returns: The file type in string format (e.g. ``yaml`` or ``json``)
:raises: :py:exc:`FileNotFoundError`, :py:exc:`khoros.errors.exceptions.UnknownFileTypeError`
"""
file_type = 'unknown'
if os.path.isfile(file_path):
if file_path.endswith('.json'):
file_type = 'json'
elif file_path.endswith('.yml') or file_path.endswith('.yaml'):
file_type = 'yaml'
else:
display_warning(f"Unable to recognize the file type of '{file_path}' by its extension.")
with open(file_path) as cfg_file:
for line in cfg_file:
if line.startswith('#'):
continue
else:
if '{' in line:
file_type = 'json'
break
if file_type == 'unknown':
raise errors.exceptions.UnknownFileTypeError(file=file_path)
else:
raise FileNotFoundError(f"Unable to locate the following file: {file_path}")
return file_type
[docs]
def convert_dict_id_values_to_strings(dict_list):
"""This function ensures that the ``id`` keys in a list of dictionaries use string values.
:param dict_list: List (or tuple) of dictionaries (or a single dictionary) containing API object data
:type dict_list: list, tuple, dict, None
:returns: A new dictionary list with properly formatted ``id`` values
:raises: :py:exc:`TypeError`
"""
dict_list = [dict_list] if isinstance(dict_list, dict) else dict_list
new_dict_list = []
for single_dict in dict_list:
if not isinstance(single_dict, dict):
raise TypeError("The 'dict_list' argument must be a dictionary or a list of dictionaries.")
if 'id' in single_dict and not isinstance(single_dict.get('id'), str):
single_dict['id'] = str(single_dict.get('id'))
new_dict_list.append(single_dict)
return new_dict_list
[docs]
def convert_dict_list_to_simple_list(dict_list, fields):
"""This function converts a list of dictionaries into a simple list consisting of the provided field(s).
.. versionadded:: 3.5.0
:param dict_list: The original list of dictionaries
:type dict_list: list
:param fields: The field(s) with which to filter the dictionary list into a simple list
:type fields: str, tuple, list
:returns: The simple list of stings or tuples depending on the number of fields
"""
new_list = []
fields = convert_string_to_tuple(fields, ',')
for field_dict in dict_list:
field_list = []
for field in fields:
if field_dict.get(field):
field_list.append(field_dict.get(field))
if not field_list:
field_list[0] = ''
new_field = field_list[0] if len(field_list) == 1 else tuple(field_list)
new_list.append(new_field)
return new_list
[docs]
def convert_list_values(values_list, convert_to='str', split_values=False, split_delimiter=','):
"""This function converts the values in a list to a different type.
:param values_list: The list of values to be converted
:type values_list: list, tuple, set
:param convert_to: One of the following types: ``str`` (Default), ``int``, ``float``, ``tuple`` or ``set``
:param split_values: Determines if the values should be split with a specific delimiter (``False`` by default)
.. note:: This only applies when converting to the ``tuple`` or ``set`` types.
:type split_values: bool
:param split_delimiter: The delimiter for which to split the values when applicable (comma by default)
:type split_delimiter: str
:returns: A new list of converted values
:raises: :py:exc:`TypeError`, :py:exc:`ValueError`
"""
new_list = []
for value in values_list:
if convert_to == 'str':
new_list.append(str(value))
elif convert_to == 'int':
new_list.append(int(value))
elif convert_to == 'float':
new_list.append(float(value))
elif convert_to == 'tuple':
value = tuple(value.split(split_delimiter)) if split_values else (value, )
new_list.append(value)
elif convert_to == 'set':
value = set(value.split(split_delimiter)) if split_values else {value}
new_list.append(value)
return new_list
[docs]
def remove_tld(url, strip_anchors=True):
"""This function removes the top-level domain (TLD) from a Khoros Community platform URL.
:param url: The URL from which the TLD should be removed
:type url: str
:param strip_anchors: Determines if anchors (e.g. ``#top``) should be stripped (``True`` by default)
:type strip_anchors: bool
:returns: The URL beginning with ``/t5/``
:raises: :py:exc:`khoros.errors.exceptions.InvalidURLError`
"""
if '/t5/' not in url:
raise errors.exceptions.InvalidURLError('The provided URL is not from the Khoros Community platform.')
url = f"/t5/{url.split('/t5/')[1]}"
return url.split('#')[0] if strip_anchors and '#' in url else url
[docs]
def merge_and_dedup(*data):
"""This function merges various data elements into a single, deduplicated list.
:param data: One or more data elements to merge and deduplicate
:returns: A merged and deduplicated list of data
"""
iter_types, unique_list = [list, tuple, set], []
for element in data:
if type(element) not in iter_types:
element = (element,)
for item in element:
if item not in unique_list:
unique_list.append(item)
return unique_list