From d8de00b9799565974bdc9a0da205e418d40422f7 Mon Sep 17 00:00:00 2001 From: Plamen Valentinov Kolev Date: Fri, 18 Oct 2024 18:06:49 +0200 Subject: [PATCH] Move utiltiy functions to utils.py and tests accordingly. --- copulas/__init__.py | 252 ----------- copulas/bivariate/__init__.py | 2 +- copulas/bivariate/base.py | 3 +- copulas/bivariate/frank.py | 2 +- copulas/datasets.py | 2 +- copulas/errors.py | 5 + copulas/multivariate/base.py | 3 +- copulas/multivariate/gaussian.py | 6 +- copulas/multivariate/tree.py | 2 +- copulas/multivariate/vine.py | 10 +- copulas/univariate/base.py | 6 +- copulas/univariate/gaussian_kde.py | 2 +- copulas/univariate/selection.py | 2 +- copulas/univariate/truncated_gaussian.py | 2 +- copulas/utils.py | 248 +++++++++++ .../{test___init__.py => test_utils.py} | 2 +- tests/numerical/cdf/test_cdf.py | 2 +- tests/numerical/fit/test_fit.py | 2 +- tests/numerical/pdf/test_pdf.py | 2 +- tests/unit/multivariate/test_gaussian.py | 2 +- tests/unit/multivariate/test_tree.py | 2 +- tests/unit/test___init__.py | 413 +----------------- tests/unit/test_utils.py | 413 ++++++++++++++++++ 23 files changed, 698 insertions(+), 687 deletions(-) create mode 100644 copulas/errors.py create mode 100644 copulas/utils.py rename tests/end-to-end/{test___init__.py => test_utils.py} (97%) create mode 100644 tests/unit/test_utils.py diff --git a/copulas/__init__.py b/copulas/__init__.py index 788a1104..bcb2504f 100644 --- a/copulas/__init__.py +++ b/copulas/__init__.py @@ -1,13 +1,9 @@ -# -*- coding: utf-8 -*- - """Top-level package for Copulas.""" __author__ = 'DataCebo, Inc.' __email__ = 'info@sdv.dev' __version__ = '0.11.2.dev0' -import contextlib -import importlib import sys import warnings from copy import deepcopy @@ -15,254 +11,6 @@ from operator import attrgetter from types import ModuleType -import numpy as np -import pandas as pd - -EPSILON = np.finfo(np.float32).eps - - -class NotFittedError(Exception): - """NotFittedError class.""" - - -@contextlib.contextmanager -def set_random_state(random_state, set_model_random_state): - """Context manager for managing the random state. - - Args: - random_state (int or np.random.RandomState): - The random seed or RandomState. - set_model_random_state (function): - Function to set the random state on the model. - """ - original_state = np.random.get_state() - - np.random.set_state(random_state.get_state()) - - try: - yield - finally: - current_random_state = np.random.RandomState() - current_random_state.set_state(np.random.get_state()) - set_model_random_state(current_random_state) - np.random.set_state(original_state) - - -def random_state(function): - """Set the random state before calling the function. - - Args: - function (Callable): - The function to wrap around. - """ - - def wrapper(self, *args, **kwargs): - if self.random_state is None: - return function(self, *args, **kwargs) - - else: - with set_random_state(self.random_state, self.set_random_state): - return function(self, *args, **kwargs) - - return wrapper - - -def validate_random_state(random_state): - """Validate random state argument. - - Args: - random_state (int, numpy.random.RandomState, tuple, or None): - Seed or RandomState for the random generator. - - Output: - numpy.random.RandomState - """ - if random_state is None: - return None - - if isinstance(random_state, int): - return np.random.RandomState(seed=random_state) - elif isinstance(random_state, np.random.RandomState): - return random_state - else: - raise TypeError( - f'`random_state` {random_state} expected to be an int ' - 'or `np.random.RandomState` object.' - ) - - -def get_instance(obj, **kwargs): - """Create new instance of the ``obj`` argument. - - Args: - obj (str, type, instance): - """ - instance = None - if isinstance(obj, str): - package, name = obj.rsplit('.', 1) - instance = getattr(importlib.import_module(package), name)(**kwargs) - elif isinstance(obj, type): - instance = obj(**kwargs) - else: - if kwargs: - instance = obj.__class__(**kwargs) - else: - args = getattr(obj, '__args__', ()) - kwargs = getattr(obj, '__kwargs__', {}) - instance = obj.__class__(*args, **kwargs) - - return instance - - -def store_args(__init__): - """Save ``*args`` and ``**kwargs`` used in the ``__init__`` of a copula. - - Args: - __init__(callable): ``__init__`` function to store their arguments. - - Returns: - callable: Decorated ``__init__`` function. - """ - - def new__init__(self, *args, **kwargs): - args_copy = deepcopy(args) - kwargs_copy = deepcopy(kwargs) - __init__(self, *args, **kwargs) - self.__args__ = args_copy - self.__kwargs__ = kwargs_copy - - return new__init__ - - -def get_qualified_name(_object): - """Return the Fully Qualified Name from an instance or class.""" - module = _object.__module__ - if hasattr(_object, '__name__'): - _class = _object.__name__ - - else: - _class = _object.__class__.__name__ - - return module + '.' + _class - - -def vectorize(function): - """Allow a method that only accepts scalars to accept vectors too. - - This decorator has two different behaviors depending on the dimensionality of the - array passed as an argument: - - **1-d array** - - It will work under the assumption that the `function` argument is a callable - with signature:: - - function(self, X, *args, **kwargs) - - where X is an scalar magnitude. - - In this case the arguments of the input array will be given one at a time, and - both the input and output of the decorated function will have shape (n,). - - **2-d array** - - It will work under the assumption that the `function` argument is a callable with signature:: - - function(self, X0, ..., Xj, *args, **kwargs) - - where `Xi` are scalar magnitudes. - - It will pass the contents of each row unpacked on each call. The input is espected to have - shape (n, j), the output a shape of (n,) - - It will return a function that is guaranteed to return a `numpy.array`. - - Args: - function(callable): Function that only accept and return scalars. - - Returns: - callable: Decorated function that can accept and return :attr:`numpy.array`. - - """ - - def decorated(self, X, *args, **kwargs): - if not isinstance(X, np.ndarray): - return function(self, X, *args, **kwargs) - - if len(X.shape) == 1: - X = X.reshape([-1, 1]) - - if len(X.shape) == 2: - return np.fromiter( - (function(self, *x, *args, **kwargs) for x in X), np.dtype('float64') - ) - - else: - raise ValueError('Arrays of dimensionality higher than 2 are not supported.') - - decorated.__doc__ = function.__doc__ - return decorated - - -def scalarize(function): - """Allow methods that only accepts 1-d vectors to work with scalars. - - Args: - function(callable): Function that accepts and returns vectors. - - Returns: - callable: Decorated function that accepts and returns scalars. - """ - - def decorated(self, X, *args, **kwargs): - scalar = not isinstance(X, np.ndarray) - - if scalar: - X = np.array([X]) - - result = function(self, X, *args, **kwargs) - if scalar: - result = result[0] - - return result - - decorated.__doc__ = function.__doc__ - return decorated - - -def check_valid_values(function): - """Raise an exception if the given values are not supported. - - Args: - function(callable): Method whose unique argument is a numpy.array-like object. - - Returns: - callable: Decorated function - - Raises: - ValueError: If there are missing or invalid values or if the dataset is empty. - """ - - def decorated(self, X, *args, **kwargs): - if isinstance(X, pd.DataFrame): - W = X.to_numpy() - - else: - W = X - - if not len(W): - raise ValueError('Your dataset is empty.') - - if not (np.issubdtype(W.dtype, np.floating) or np.issubdtype(W.dtype, np.integer)): - raise ValueError('There are non-numerical values in your data.') - - if np.isnan(W).any().any(): - raise ValueError('There are nan values in your data.') - - return function(self, X, *args, **kwargs) - - return decorated - def _get_addon_target(addon_path_name): """Find the target object for the add-on. diff --git a/copulas/bivariate/__init__.py b/copulas/bivariate/__init__.py index 9f869bef..90b80262 100644 --- a/copulas/bivariate/__init__.py +++ b/copulas/bivariate/__init__.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd -from copulas import EPSILON +from copulas.utils import EPSILON from copulas.bivariate.base import Bivariate, CopulaTypes from copulas.bivariate.clayton import Clayton from copulas.bivariate.frank import Frank diff --git a/copulas/bivariate/base.py b/copulas/bivariate/base.py index 32a37539..a12c7bef 100644 --- a/copulas/bivariate/base.py +++ b/copulas/bivariate/base.py @@ -8,8 +8,9 @@ from scipy import stats from scipy.optimize import brentq -from copulas import EPSILON, NotFittedError, random_state, validate_random_state from copulas.bivariate.utils import split_matrix +from copulas.errors import NotFittedError +from copulas.utils import EPSILON, random_state, validate_random_state class CopulaTypes(Enum): diff --git a/copulas/bivariate/frank.py b/copulas/bivariate/frank.py index 64edee05..76f29dd1 100644 --- a/copulas/bivariate/frank.py +++ b/copulas/bivariate/frank.py @@ -6,9 +6,9 @@ import scipy.integrate as integrate from scipy.optimize import least_squares -from copulas import EPSILON from copulas.bivariate.base import Bivariate, CopulaTypes from copulas.bivariate.utils import split_matrix +from copulas.utils import EPSILON MIN_FLOAT_LOG = np.log(sys.float_info.min) MAX_FLOAT_LOG = np.log(sys.float_info.max) diff --git a/copulas/datasets.py b/copulas/datasets.py index d47b7870..e3bb90d8 100644 --- a/copulas/datasets.py +++ b/copulas/datasets.py @@ -4,7 +4,7 @@ import pandas as pd from scipy import stats -from copulas import set_random_state, validate_random_state +from copulas.utils import set_random_state, validate_random_state def _dummy_fn(state): diff --git a/copulas/errors.py b/copulas/errors.py new file mode 100644 index 00000000..7f859533 --- /dev/null +++ b/copulas/errors.py @@ -0,0 +1,5 @@ +"""Copulas Exceptions.""" + + +class NotFittedError(Exception): + """NotFittedError class.""" diff --git a/copulas/multivariate/base.py b/copulas/multivariate/base.py index f7278f1e..121c4f07 100644 --- a/copulas/multivariate/base.py +++ b/copulas/multivariate/base.py @@ -4,7 +4,8 @@ import numpy as np -from copulas import NotFittedError, get_instance, validate_random_state +from copulas.errors import NotFittedError +from copulas.utils import get_instance, validate_random_state class Multivariate(object): diff --git a/copulas/multivariate/gaussian.py b/copulas/multivariate/gaussian.py index 2c0437be..a929d7ba 100644 --- a/copulas/multivariate/gaussian.py +++ b/copulas/multivariate/gaussian.py @@ -7,7 +7,9 @@ import pandas as pd from scipy import stats -from copulas import ( +from copulas.multivariate.base import Multivariate +from copulas.univariate import GaussianUnivariate, Univariate +from copulas.utils import ( EPSILON, check_valid_values, get_instance, @@ -16,8 +18,6 @@ store_args, validate_random_state, ) -from copulas.multivariate.base import Multivariate -from copulas.univariate import GaussianUnivariate, Univariate LOGGER = logging.getLogger(__name__) DEFAULT_DISTRIBUTION = Univariate diff --git a/copulas/multivariate/tree.py b/copulas/multivariate/tree.py index 8cbcfb1e..0eea1d0e 100644 --- a/copulas/multivariate/tree.py +++ b/copulas/multivariate/tree.py @@ -6,9 +6,9 @@ import numpy as np import scipy -from copulas import EPSILON, get_qualified_name from copulas.bivariate.base import Bivariate from copulas.multivariate.base import Multivariate +from copulas.utils import EPSILON, get_qualified_name LOGGER = logging.getLogger(__name__) diff --git a/copulas/multivariate/vine.py b/copulas/multivariate/vine.py index a6ce7173..99a46891 100644 --- a/copulas/multivariate/vine.py +++ b/copulas/multivariate/vine.py @@ -7,7 +7,11 @@ import numpy as np import pandas as pd -from copulas import ( +from copulas.bivariate.base import Bivariate, CopulaTypes +from copulas.multivariate.base import Multivariate +from copulas.multivariate.tree import Tree, get_tree +from copulas.univariate.gaussian_kde import GaussianKDE +from copulas.utils import ( EPSILON, check_valid_values, get_qualified_name, @@ -15,10 +19,6 @@ store_args, validate_random_state, ) -from copulas.bivariate.base import Bivariate, CopulaTypes -from copulas.multivariate.base import Multivariate -from copulas.multivariate.tree import Tree, get_tree -from copulas.univariate.gaussian_kde import GaussianKDE LOGGER = logging.getLogger(__name__) diff --git a/copulas/univariate/base.py b/copulas/univariate/base.py index 79b089e7..58a3a45c 100644 --- a/copulas/univariate/base.py +++ b/copulas/univariate/base.py @@ -6,15 +6,15 @@ import numpy as np -from copulas import ( - NotFittedError, +from copulas.errors import NotFittedError +from copulas.univariate.selection import select_univariate +from copulas.utils import ( get_instance, get_qualified_name, random_state, store_args, validate_random_state, ) -from copulas.univariate.selection import select_univariate class ParametricType(Enum): diff --git a/copulas/univariate/gaussian_kde.py b/copulas/univariate/gaussian_kde.py index 3b613188..2a0f05d8 100644 --- a/copulas/univariate/gaussian_kde.py +++ b/copulas/univariate/gaussian_kde.py @@ -4,9 +4,9 @@ from scipy.special import ndtr from scipy.stats import gaussian_kde -from copulas import EPSILON, random_state, store_args, validate_random_state from copulas.optimize import bisect, chandrupatla from copulas.univariate.base import BoundedType, ParametricType, ScipyModel +from copulas.utils import EPSILON, random_state, store_args, validate_random_state class GaussianKDE(ScipyModel): diff --git a/copulas/univariate/selection.py b/copulas/univariate/selection.py index e0c31e6c..d2ccb229 100644 --- a/copulas/univariate/selection.py +++ b/copulas/univariate/selection.py @@ -3,7 +3,7 @@ import numpy as np from scipy.stats import kstest -from copulas import get_instance +from copulas.utils import get_instance def select_univariate(X, candidates): diff --git a/copulas/univariate/truncated_gaussian.py b/copulas/univariate/truncated_gaussian.py index e9aff169..a4af4a87 100644 --- a/copulas/univariate/truncated_gaussian.py +++ b/copulas/univariate/truncated_gaussian.py @@ -6,8 +6,8 @@ from scipy.optimize import fmin_slsqp from scipy.stats import truncnorm -from copulas import EPSILON, store_args, validate_random_state from copulas.univariate.base import BoundedType, ParametricType, ScipyModel +from copulas.utils import EPSILON, store_args, validate_random_state class TruncatedGaussian(ScipyModel): diff --git a/copulas/utils.py b/copulas/utils.py new file mode 100644 index 00000000..6aec75a1 --- /dev/null +++ b/copulas/utils.py @@ -0,0 +1,248 @@ +"""Utils module.""" + +import contextlib +import importlib +from copy import deepcopy +from functools import wraps + +import numpy as np +import pandas as pd + +EPSILON = np.finfo(np.float32).eps + + +@contextlib.contextmanager +def set_random_state(random_state, set_model_random_state): + """Context manager for managing the random state. + + Args: + random_state (int or np.random.RandomState): + The random seed or RandomState. + set_model_random_state (function): + Function to set the random state on the model. + """ + original_state = np.random.get_state() + np.random.set_state(random_state.get_state()) + + try: + yield + finally: + current_random_state = np.random.RandomState() + current_random_state.set_state(np.random.get_state()) + set_model_random_state(current_random_state) + np.random.set_state(original_state) + + +def random_state(function): + """Set the random state before calling the function. + + Args: + function (Callable): + The function to wrap around. + """ + + @wraps(function) + def wrapper(self, *args, **kwargs): + if self.random_state is None: + return function(self, *args, **kwargs) + else: + with set_random_state(self.random_state, self.set_random_state): + return function(self, *args, **kwargs) + + return wrapper + + +def validate_random_state(random_state): + """Validate random state argument. + + Args: + random_state (int, numpy.random.RandomState, tuple, or None): + Seed or RandomState for the random generator. + + Output: + numpy.random.RandomState + """ + if random_state is None: + return None + + if isinstance(random_state, int): + return np.random.RandomState(seed=random_state) + elif isinstance(random_state, np.random.RandomState): + return random_state + else: + raise TypeError( + f'`random_state` {random_state} expected to be an int ' + 'or `np.random.RandomState` object.' + ) + + +def get_instance(obj, **kwargs): + """Create new instance of the ``obj`` argument. + + Args: + obj (str, type, instance): + """ + instance = None + if isinstance(obj, str): + package, name = obj.rsplit('.', 1) + instance = getattr(importlib.import_module(package), name)(**kwargs) + elif isinstance(obj, type): + instance = obj(**kwargs) + else: + if kwargs: + instance = obj.__class__(**kwargs) + else: + args = getattr(obj, '__args__', ()) + kwargs = getattr(obj, '__kwargs__', {}) + instance = obj.__class__(*args, **kwargs) + + return instance + + +def store_args(__init__): + """Save ``*args`` and ``**kwargs`` used in the ``__init__`` of a copula. + + Args: + __init__(callable): ``__init__`` function to store their arguments. + + Returns: + callable: Decorated ``__init__`` function. + """ + + @wraps(__init__) + def new__init__(self, *args, **kwargs): + args_copy = deepcopy(args) + kwargs_copy = deepcopy(kwargs) + __init__(self, *args, **kwargs) + self.__args__ = args_copy + self.__kwargs__ = kwargs_copy + + return new__init__ + + +def get_qualified_name(_object): + """Return the Fully Qualified Name from an instance or class.""" + module = _object.__module__ + if hasattr(_object, '__name__'): + _class = _object.__name__ + else: + _class = _object.__class__.__name__ + + return module + '.' + _class + + +def vectorize(function): + """Allow a method that only accepts scalars to accept vectors too. + + This decorator has two different behaviors depending on the dimensionality of the + array passed as an argument: + + **1-d array** + + It will work under the assumption that the `function` argument is a callable + with signature:: + + function(self, X, *args, **kwargs) + + where X is an scalar magnitude. + + In this case the arguments of the input array will be given one at a time, and + both the input and output of the decorated function will have shape (n,). + + **2-d array** + + It will work under the assumption that the `function` argument is a callable with signature:: + + function(self, X0, ..., Xj, *args, **kwargs) + + where `Xi` are scalar magnitudes. + + It will pass the contents of each row unpacked on each call. The input is espected to have + shape (n, j), the output a shape of (n,) + + It will return a function that is guaranteed to return a `numpy.array`. + + Args: + function(callable): Function that only accept and return scalars. + + Returns: + callable: Decorated function that can accept and return :attr:`numpy.array`. + + """ + + @wraps(function) + def decorated(self, X, *args, **kwargs): + if not isinstance(X, np.ndarray): + return function(self, X, *args, **kwargs) + + if len(X.shape) == 1: + X = X.reshape([-1, 1]) + + if len(X.shape) == 2: + return np.fromiter( + (function(self, *x, *args, **kwargs) for x in X), np.dtype('float64') + ) + else: + raise ValueError('Arrays of dimensionality higher than 2 are not supported.') + + return decorated + + +def scalarize(function): + """Allow methods that only accepts 1-d vectors to work with scalars. + + Args: + function(callable): Function that accepts and returns vectors. + + Returns: + callable: Decorated function that accepts and returns scalars. + """ + + @wraps(function) + def decorated(self, X, *args, **kwargs): + scalar = not isinstance(X, np.ndarray) + + if scalar: + X = np.array([X]) + + result = function(self, X, *args, **kwargs) + if scalar: + result = result[0] + + return result + + return decorated + + +def check_valid_values(function): + """Raise an exception if the given values are not supported. + + Args: + function(callable): Method whose unique argument is a numpy.array-like object. + + Returns: + callable: Decorated function + + Raises: + ValueError: If there are missing or invalid values or if the dataset is empty. + """ + + @wraps(function) + def decorated(self, X, *args, **kwargs): + if isinstance(X, pd.DataFrame): + W = X.to_numpy() + else: + W = X + + if not len(W): + raise ValueError('Your dataset is empty.') + + if not (np.issubdtype(W.dtype, np.floating) or np.issubdtype(W.dtype, np.integer)): + raise ValueError('There are non-numerical values in your data.') + + if np.isnan(W).any().any(): + raise ValueError('There are nan values in your data.') + + return function(self, X, *args, **kwargs) + + return decorated diff --git a/tests/end-to-end/test___init__.py b/tests/end-to-end/test_utils.py similarity index 97% rename from tests/end-to-end/test___init__.py rename to tests/end-to-end/test_utils.py index 1eb568be..92372e6d 100644 --- a/tests/end-to-end/test___init__.py +++ b/tests/end-to-end/test_utils.py @@ -1,6 +1,6 @@ import numpy as np -from copulas import random_state +from copulas.utils import random_state class TestRandomState: diff --git a/tests/numerical/cdf/test_cdf.py b/tests/numerical/cdf/test_cdf.py index 13956867..90754e3a 100644 --- a/tests/numerical/cdf/test_cdf.py +++ b/tests/numerical/cdf/test_cdf.py @@ -6,7 +6,7 @@ import pandas as pd import pytest -from copulas import get_instance +from copulas.utils import get_instance BASE = os.path.dirname(__file__) TESTS = glob.glob(BASE + '/test_cases/*/*.json') diff --git a/tests/numerical/fit/test_fit.py b/tests/numerical/fit/test_fit.py index b4c67c2b..4b13f2a0 100644 --- a/tests/numerical/fit/test_fit.py +++ b/tests/numerical/fit/test_fit.py @@ -6,7 +6,7 @@ import pandas as pd import pytest -from copulas import get_instance +from copulas.utils import get_instance BASE = os.path.dirname(__file__) TESTS = glob.glob(BASE + '/test_cases/*/*.json') diff --git a/tests/numerical/pdf/test_pdf.py b/tests/numerical/pdf/test_pdf.py index 00099399..b60e1d6e 100644 --- a/tests/numerical/pdf/test_pdf.py +++ b/tests/numerical/pdf/test_pdf.py @@ -6,7 +6,7 @@ import pandas as pd import pytest -from copulas import get_instance +from copulas.utils import get_instance BASE = os.path.dirname(__file__) TESTS = glob.glob(BASE + '/test_cases/*/*.json') diff --git a/tests/unit/multivariate/test_gaussian.py b/tests/unit/multivariate/test_gaussian.py index 16d52f8f..545edcc4 100644 --- a/tests/unit/multivariate/test_gaussian.py +++ b/tests/unit/multivariate/test_gaussian.py @@ -4,9 +4,9 @@ import numpy as np import pandas as pd -from copulas import get_qualified_name from copulas.multivariate.gaussian import GaussianMultivariate from copulas.univariate import GaussianUnivariate +from copulas.utils import get_qualified_name class TestGaussianMultivariate(TestCase): diff --git a/tests/unit/multivariate/test_tree.py b/tests/unit/multivariate/test_tree.py index 19ee0071..cc0c93b6 100644 --- a/tests/unit/multivariate/test_tree.py +++ b/tests/unit/multivariate/test_tree.py @@ -6,10 +6,10 @@ import pandas as pd import pytest -from copulas import EPSILON from copulas.bivariate import CopulaTypes from copulas.multivariate.tree import Edge, Tree, TreeTypes, get_tree from copulas.univariate.gaussian_kde import GaussianKDE +from copulas.utils import EPSILON from tests import compare_nested_dicts, compare_nested_iterables, compare_values_epsilon diff --git a/tests/unit/test___init__.py b/tests/unit/test___init__.py index 0aee8507..f0b30923 100644 --- a/tests/unit/test___init__.py +++ b/tests/unit/test___init__.py @@ -1,417 +1,12 @@ +"""Top level unit tests.""" + import sys -from unittest import TestCase -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import MagicMock, Mock, patch -import numpy as np -import pandas as pd import pytest -from numpy.testing import assert_array_equal import copulas -from copulas import ( - _find_addons, - check_valid_values, - get_instance, - random_state, - scalarize, - validate_random_state, - vectorize, -) -from copulas.multivariate import GaussianMultivariate - - -class TestVectorize(TestCase): - def test_1d_array(self): - """When applied to a function it allows it to work with 1-d vectors.""" - # Setup - function = MagicMock() - function.return_value = 1 - function.__doc__ = 'Docstring of the original function.' - - instance = MagicMock() - - vector = np.array([1, 2, 3]) - args = ['positional', 'arguments'] - kwargs = {'keyword': 'arguments'} - - expected_result = np.ones(3) - expected_function_call_args_list = [ - ((instance, 1, 'positional', 'arguments'), {'keyword': 'arguments'}), - ((instance, 2, 'positional', 'arguments'), {'keyword': 'arguments'}), - ((instance, 3, 'positional', 'arguments'), {'keyword': 'arguments'}), - ] - - # Run Decorator - vectorized_function = vectorize(function) - - # Check Decorator - assert callable(vectorized_function) - assert vectorized_function.__doc__ == 'Docstring of the original function.' - - # Run decorated function - result = vectorized_function(instance, vector, *args, **kwargs) - - # Check result of decorated function call - assert result.shape == (3,) - assert_array_equal(result, expected_result) - - assert function.call_args_list == expected_function_call_args_list - - instance.assert_not_called() - assert instance.method_calls == [] - - def test_2d_array(self): - """When applied to a function it allows it to work with 2-d vectors.""" - # Setup - function = MagicMock() - function.return_value = 1 - function.__doc__ = 'Docstring of the original function.' - - instance = MagicMock() - - vector = np.array([ - [1, 2, 3], - [4, 5, 6], - [7, 8, 9], - ]) - args = ['positional', 'arguments'] - kwargs = {'keyword': 'arguments'} - - expected_result = np.ones(3) - expected_function_call_args_list = [ - ((instance, 1, 2, 3, 'positional', 'arguments'), {'keyword': 'arguments'}), - ((instance, 4, 5, 6, 'positional', 'arguments'), {'keyword': 'arguments'}), - ((instance, 7, 8, 9, 'positional', 'arguments'), {'keyword': 'arguments'}), - ] - - # Run Decorator - vectorized_function = vectorize(function) - - # Check Decorator - assert callable(vectorized_function) - assert vectorized_function.__doc__ == 'Docstring of the original function.' - - # Run decorated function - result = vectorized_function(instance, vector, *args, **kwargs) - - # Check result of decorated function call - assert result.shape == (3,) - assert_array_equal(result, expected_result) - - assert function.call_args_list == expected_function_call_args_list - - instance.assert_not_called() - assert instance.method_calls == [] - - def test_raises_valueerror(self): - """If given an array of dimensionality higher than 2 a ValueError is raised.""" - # Setup - function = MagicMock() - X = np.array([[[1, 2, 3]]]) - instance = MagicMock() - args = () - kwargs = {} - - # Run - vectorized_function = vectorize(function) - - # Check - error_msg = 'Arrays of dimensionality higher than 2 are not supported.' - with pytest.raises(ValueError, match=error_msg): - vectorized_function(instance, X, *args, **kwargs) - - -class TestScalarize(TestCase): - def test_decorator(self): - """When applied to a function it allows it to work with scalars.""" - # Setup - function = MagicMock() - function.__doc__ = 'Docstring of the original function.' - function.return_value = np.array(['return_value']) - - instance = MagicMock() - args = ['positional', 'arguments'] - kwargs = {'keyword': 'arguments'} - - expected_result = 'return_value' - - # Run Decorator - scalarized_function = scalarize(function) - - # Check Decorator - assert callable(scalarized_function) - assert scalarized_function.__doc__ == 'Docstring of the original function.' - - # Run decorated function - result = scalarized_function(instance, 0, *args, **kwargs) - - # Check decorated function - assert result == expected_result - - function.assert_called_once_with(instance, np.array([0]), *args, **kwargs) - - instance.assert_not_called() - assert instance.method_calls == [] - - -class TestCheckValidValues(TestCase): - def test_check_valid_values_raises_valuerror_if_nans(self): - """check_valid_values raises a ValueError if is given data with nans.""" - # Setup - X = np.array([[1.0, np.nan], [0.0, 1.0]]) - - instance_mock = MagicMock() - function_mock = MagicMock() - - # Run - decorated_function = check_valid_values(function_mock) - - # Check: - error_msg = 'There are nan values in your data.' - with pytest.raises(ValueError, match=error_msg): - decorated_function(instance_mock, X) - - function_mock.assert_not_called() - instance_mock.assert_not_called() - - def test_check_valid_values_raises_valueerror_if_not_numeric(self): - """check_valid_values raises a ValueError if is given data with non numeric values.""" - # Setup - X = np.array([[1.0, 'A'], [0.0, 1.0]]) - - instance_mock = MagicMock() - function_mock = MagicMock() - - # Run - decorated_function = check_valid_values(function_mock) - - # Check: - error_msg = 'There are non-numerical values in your data.' - with pytest.raises(ValueError, match=error_msg): - decorated_function(instance_mock, X) - - function_mock.assert_not_called() - instance_mock.assert_not_called() - - def test_check_valid_values_raises_valueerror_empty_dataset(self): - """check_valid_values raises a ValueError if given data is empty.""" - # Setup - X = np.array([]) - - instance_mock = MagicMock() - function_mock = MagicMock() - - # Run - decorated_function = check_valid_values(function_mock) - - # Check: - error_msg = 'Your dataset is empty.' - with pytest.raises(ValueError, match=error_msg): - decorated_function(instance_mock, X) - - function_mock.assert_not_called() - instance_mock.assert_not_called() - - -class TestRandomStateDecorator(TestCase): - @patch('copulas.np.random') - def test_valid_random_state(self, random_mock): - """The decorated function use the random_state attribute if present.""" - # Setup - my_function = MagicMock() - instance = MagicMock() - random_state_mock = MagicMock() - random_state_mock.get_state.return_value = 'desired random state' - instance.random_state = random_state_mock - - args = ('some', 'args') - kwargs = {'keyword': 'value'} - - random_mock.get_state.return_value = 'random state' - - # Run - decorated_function = random_state(my_function) - decorated_function(instance, *args, **kwargs) - - # Check - my_function.assert_called_once_with(instance, *args, **kwargs) - - instance.assert_not_called - random_mock.get_state.assert_has_calls([call(), call()]) - random_mock.get_state.call_count == 2 - random_mock.RandomState.assert_has_calls([call(), call().set_state('random state')]) - random_mock.set_state.assert_has_calls([ - call('desired random state'), - call('random state'), - ]) - assert random_mock.set_state.call_count == 2 - - @patch('copulas.np.random') - def test_no_random_state(self, random_mock): - """If random_state is None, the decorated function only call to the original.""" - # Setup - my_function = MagicMock() - instance = MagicMock() - instance.random_state = None - - args = ('some', 'args') - kwargs = {'keyword': 'value'} - - random_mock.get_state.return_value = 'random state' - - # Run - decorated_function = random_state(my_function) - decorated_function(instance, *args, **kwargs) - - # Check - my_function.assert_called_once_with(instance, *args, **kwargs) - - instance.assert_not_called - random_mock.get_state.assert_not_called() - random_mock.RandomState.assert_not_called() - random_mock.set_state.assert_not_called() - - def test_validate_random_state_int(self): - """Test `validate_random_state` with an int. - - Expect that the int is used to seed the RandomState object. - - Input: - - integer seed - Output: - - np.Random.RandomState - """ - # Setup - state = 4 - - # Run - out = validate_random_state(state) - - # Assert - assert isinstance(out, np.random.RandomState) - - def test_validate_random_state_none(self): - """Test `validate_random_state` with an input of None. - - Expect that None is also returned. - - Input: - - state of None - Output: - - None - """ - # Setup - state = None - - # Run - validate_random_state(state) - - # Assert - assert not state - - def test_validate_random_state_object(self): - """Test `validate_random_state` with a `np.random.RandomState` object. - - Expect that the same object is returned. - - Input: - - np.random.RandomState object - Output: - - state - """ - # Setup - state = np.random.RandomState(0) - - # Run - out = validate_random_state(state) - - # Assert - assert out == state - - def test_validate_random_state_invalid(self): - """Test `validate_random_state` with an invalid input type. - - Expect a TypeError to be thrown. - - Input: - - invalid input - Side Effect: - - TypeError - """ - # Setup - state = 'invalid input' - - # Run - with pytest.raises( - TypeError, - match=f'`random_state` {state} expected to be an int or ' - '`np.random.RandomState` object.', - ): - validate_random_state(state) - - -class TestGetInstance(TestCase): - def test_get_instance_str(self): - """Try to get a new instance from a str""" - # Run - instance = get_instance('copulas.multivariate.gaussian.GaussianMultivariate') - - # Asserts - assert not instance.fitted - assert isinstance(instance, GaussianMultivariate) - - def test_get_instance___class__(self): - """Try to get a new instance from a __clas__""" - # Run - instance = get_instance(GaussianMultivariate) - - # Asserts - assert not instance.fitted - assert isinstance(instance, GaussianMultivariate) - - def test_get_instance_instance(self): - """Try to get a new instance from a instance""" - # Run - instance = get_instance(GaussianMultivariate()) - - # Asserts - assert not instance.fitted - assert isinstance(instance, GaussianMultivariate) - - def test_get_instance_instance_fitted(self): - """Try to get a new instance from a fitted instance""" - # Run - gaussian = GaussianMultivariate() - gaussian.fit(pd.DataFrame({'a_field': list(range(10))})) - instance = get_instance(gaussian) - - # Asserts - assert not instance.fitted - assert isinstance(instance, GaussianMultivariate) - - def test_get_instance_instance_distribution(self): - """Try to get a new instance from a instance with distribution""" - # Run - instance = get_instance( - GaussianMultivariate(distribution='copulas.univariate.truncnorm.TruncNorm') - ) - - # Asserts - assert not instance.fitted - assert isinstance(instance, GaussianMultivariate) - assert instance.distribution == 'copulas.univariate.truncnorm.TruncNorm' - - def test_get_instance_with_kwargs(self): - """Try to get a new instance with kwargs""" - # Run - instance = get_instance( - GaussianMultivariate, distribution='copulas.univariate.truncnorm.TruncNorm' - ) - - # Asserts - assert not instance.fitted - assert isinstance(instance, GaussianMultivariate) - assert instance.distribution == 'copulas.univariate.truncnorm.TruncNorm' +from copulas import _find_addons @pytest.fixture diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py new file mode 100644 index 00000000..c426f36b --- /dev/null +++ b/tests/unit/test_utils.py @@ -0,0 +1,413 @@ +"""Unit tests for utils module.""" + +from unittest import TestCase +from unittest.mock import MagicMock, call, patch + +import numpy as np +import pandas as pd +import pytest +from numpy.testing import assert_array_equal + +from copulas.multivariate import GaussianMultivariate +from copulas.utils import ( + check_valid_values, + get_instance, + random_state, + scalarize, + validate_random_state, + vectorize, +) + + +class TestVectorize(TestCase): + def test_1d_array(self): + """When applied to a function it allows it to work with 1-d vectors.""" + # Setup + function = MagicMock() + function.return_value = 1 + function.__doc__ = 'Docstring of the original function.' + + instance = MagicMock() + + vector = np.array([1, 2, 3]) + args = ['positional', 'arguments'] + kwargs = {'keyword': 'arguments'} + + expected_result = np.ones(3) + expected_function_call_args_list = [ + ((instance, 1, 'positional', 'arguments'), {'keyword': 'arguments'}), + ((instance, 2, 'positional', 'arguments'), {'keyword': 'arguments'}), + ((instance, 3, 'positional', 'arguments'), {'keyword': 'arguments'}), + ] + + # Run Decorator + vectorized_function = vectorize(function) + + # Check Decorator + assert callable(vectorized_function) + assert vectorized_function.__doc__ == 'Docstring of the original function.' + + # Run decorated function + result = vectorized_function(instance, vector, *args, **kwargs) + + # Check result of decorated function call + assert result.shape == (3,) + assert_array_equal(result, expected_result) + + assert function.call_args_list == expected_function_call_args_list + + instance.assert_not_called() + assert instance.method_calls == [] + + def test_2d_array(self): + """When applied to a function it allows it to work with 2-d vectors.""" + # Setup + function = MagicMock() + function.return_value = 1 + function.__doc__ = 'Docstring of the original function.' + + instance = MagicMock() + + vector = np.array([ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9], + ]) + args = ['positional', 'arguments'] + kwargs = {'keyword': 'arguments'} + + expected_result = np.ones(3) + expected_function_call_args_list = [ + ((instance, 1, 2, 3, 'positional', 'arguments'), {'keyword': 'arguments'}), + ((instance, 4, 5, 6, 'positional', 'arguments'), {'keyword': 'arguments'}), + ((instance, 7, 8, 9, 'positional', 'arguments'), {'keyword': 'arguments'}), + ] + + # Run Decorator + vectorized_function = vectorize(function) + + # Check Decorator + assert callable(vectorized_function) + assert vectorized_function.__doc__ == 'Docstring of the original function.' + + # Run decorated function + result = vectorized_function(instance, vector, *args, **kwargs) + + # Check result of decorated function call + assert result.shape == (3,) + assert_array_equal(result, expected_result) + + assert function.call_args_list == expected_function_call_args_list + + instance.assert_not_called() + assert instance.method_calls == [] + + def test_raises_valueerror(self): + """If given an array of dimensionality higher than 2 a ValueError is raised.""" + # Setup + function = MagicMock() + X = np.array([[[1, 2, 3]]]) + instance = MagicMock() + args = () + kwargs = {} + + # Run + vectorized_function = vectorize(function) + + # Check + error_msg = 'Arrays of dimensionality higher than 2 are not supported.' + with pytest.raises(ValueError, match=error_msg): + vectorized_function(instance, X, *args, **kwargs) + + +class TestScalarize(TestCase): + def test_decorator(self): + """When applied to a function it allows it to work with scalars.""" + # Setup + function = MagicMock() + function.__doc__ = 'Docstring of the original function.' + function.return_value = np.array(['return_value']) + + instance = MagicMock() + args = ['positional', 'arguments'] + kwargs = {'keyword': 'arguments'} + + expected_result = 'return_value' + + # Run Decorator + scalarized_function = scalarize(function) + + # Check Decorator + assert callable(scalarized_function) + assert scalarized_function.__doc__ == 'Docstring of the original function.' + + # Run decorated function + result = scalarized_function(instance, 0, *args, **kwargs) + + # Check decorated function + assert result == expected_result + + function.assert_called_once_with(instance, np.array([0]), *args, **kwargs) + + instance.assert_not_called() + assert instance.method_calls == [] + + +class TestCheckValidValues(TestCase): + def test_check_valid_values_raises_valuerror_if_nans(self): + """check_valid_values raises a ValueError if is given data with nans.""" + # Setup + X = np.array([[1.0, np.nan], [0.0, 1.0]]) + + instance_mock = MagicMock() + function_mock = MagicMock() + + # Run + decorated_function = check_valid_values(function_mock) + + # Check: + error_msg = 'There are nan values in your data.' + with pytest.raises(ValueError, match=error_msg): + decorated_function(instance_mock, X) + + function_mock.assert_not_called() + instance_mock.assert_not_called() + + def test_check_valid_values_raises_valueerror_if_not_numeric(self): + """check_valid_values raises a ValueError if is given data with non numeric values.""" + # Setup + X = np.array([[1.0, 'A'], [0.0, 1.0]]) + + instance_mock = MagicMock() + function_mock = MagicMock() + + # Run + decorated_function = check_valid_values(function_mock) + + # Check: + error_msg = 'There are non-numerical values in your data.' + with pytest.raises(ValueError, match=error_msg): + decorated_function(instance_mock, X) + + function_mock.assert_not_called() + instance_mock.assert_not_called() + + def test_check_valid_values_raises_valueerror_empty_dataset(self): + """check_valid_values raises a ValueError if given data is empty.""" + # Setup + X = np.array([]) + + instance_mock = MagicMock() + function_mock = MagicMock() + + # Run + decorated_function = check_valid_values(function_mock) + + # Check: + error_msg = 'Your dataset is empty.' + with pytest.raises(ValueError, match=error_msg): + decorated_function(instance_mock, X) + + function_mock.assert_not_called() + instance_mock.assert_not_called() + + +class TestRandomStateDecorator(TestCase): + @patch('copulas.utils.np.random') + def test_valid_random_state(self, random_mock): + """The decorated function use the random_state attribute if present.""" + # Setup + my_function = MagicMock() + instance = MagicMock() + random_state_mock = MagicMock() + random_state_mock.get_state.return_value = 'desired random state' + instance.random_state = random_state_mock + + args = ('some', 'args') + kwargs = {'keyword': 'value'} + + random_mock.get_state.return_value = 'random state' + + # Run + decorated_function = random_state(my_function) + decorated_function(instance, *args, **kwargs) + + # Check + my_function.assert_called_once_with(instance, *args, **kwargs) + + instance.assert_not_called + random_mock.get_state.assert_has_calls([call(), call()]) + random_mock.get_state.call_count == 2 + random_mock.RandomState.assert_has_calls([call(), call().set_state('random state')]) + random_mock.set_state.assert_has_calls([ + call('desired random state'), + call('random state'), + ]) + assert random_mock.set_state.call_count == 2 + + @patch('copulas.utils.np.random') + def test_no_random_state(self, random_mock): + """If random_state is None, the decorated function only call to the original.""" + # Setup + my_function = MagicMock() + instance = MagicMock() + instance.random_state = None + + args = ('some', 'args') + kwargs = {'keyword': 'value'} + + random_mock.get_state.return_value = 'random state' + + # Run + decorated_function = random_state(my_function) + decorated_function(instance, *args, **kwargs) + + # Check + my_function.assert_called_once_with(instance, *args, **kwargs) + + instance.assert_not_called + random_mock.get_state.assert_not_called() + random_mock.RandomState.assert_not_called() + random_mock.set_state.assert_not_called() + + def test_validate_random_state_int(self): + """Test `validate_random_state` with an int. + + Expect that the int is used to seed the RandomState object. + + Input: + - integer seed + Output: + - np.Random.RandomState + """ + # Setup + state = 4 + + # Run + out = validate_random_state(state) + + # Assert + assert isinstance(out, np.random.RandomState) + + def test_validate_random_state_none(self): + """Test `validate_random_state` with an input of None. + + Expect that None is also returned. + + Input: + - state of None + Output: + - None + """ + # Setup + state = None + + # Run + validate_random_state(state) + + # Assert + assert not state + + def test_validate_random_state_object(self): + """Test `validate_random_state` with a `np.random.RandomState` object. + + Expect that the same object is returned. + + Input: + - np.random.RandomState object + Output: + - state + """ + # Setup + state = np.random.RandomState(0) + + # Run + out = validate_random_state(state) + + # Assert + assert out == state + + def test_validate_random_state_invalid(self): + """Test `validate_random_state` with an invalid input type. + + Expect a TypeError to be thrown. + + Input: + - invalid input + Side Effect: + - TypeError + """ + # Setup + state = 'invalid input' + + # Run + with pytest.raises( + TypeError, + match=f'`random_state` {state} expected to be an int or ' + '`np.random.RandomState` object.', + ): + validate_random_state(state) + + +class TestGetInstance(TestCase): + def test_get_instance_str(self): + """Try to get a new instance from a str""" + # Run + instance = get_instance('copulas.multivariate.gaussian.GaussianMultivariate') + + # Asserts + assert not instance.fitted + assert isinstance(instance, GaussianMultivariate) + + def test_get_instance___class__(self): + """Try to get a new instance from a __clas__""" + # Run + instance = get_instance(GaussianMultivariate) + + # Asserts + assert not instance.fitted + assert isinstance(instance, GaussianMultivariate) + + def test_get_instance_instance(self): + """Try to get a new instance from a instance""" + # Run + instance = get_instance(GaussianMultivariate()) + + # Asserts + assert not instance.fitted + assert isinstance(instance, GaussianMultivariate) + + def test_get_instance_instance_fitted(self): + """Try to get a new instance from a fitted instance""" + # Run + gaussian = GaussianMultivariate() + gaussian.fit(pd.DataFrame({'a_field': list(range(10))})) + instance = get_instance(gaussian) + + # Asserts + assert not instance.fitted + assert isinstance(instance, GaussianMultivariate) + + def test_get_instance_instance_distribution(self): + """Try to get a new instance from a instance with distribution""" + # Run + instance = get_instance( + GaussianMultivariate(distribution='copulas.univariate.truncnorm.TruncNorm') + ) + + # Asserts + assert not instance.fitted + assert isinstance(instance, GaussianMultivariate) + assert instance.distribution == 'copulas.univariate.truncnorm.TruncNorm' + + def test_get_instance_with_kwargs(self): + """Try to get a new instance with kwargs""" + # Run + instance = get_instance( + GaussianMultivariate, distribution='copulas.univariate.truncnorm.TruncNorm' + ) + + # Asserts + assert not instance.fitted + assert isinstance(instance, GaussianMultivariate) + assert instance.distribution == 'copulas.univariate.truncnorm.TruncNorm'