Source code for pycsou.func.base

# #############################################################################
# base.py
# =======
# Author : Matthieu Simeoni [matthieu.simeoni@gmail.com]
# #############################################################################

r"""
Interface classes for constructing functionals.
"""

from numbers import Number
from typing import Union, Callable

import numpy as np
import joblib as job

from pycsou.core.functional import ProximableFunctional, DifferentiableFunctional, LinearFunctional
from pycsou.core.map import MapHStack, DiffMapHStack


[docs]class ProxFuncHStack(ProximableFunctional, MapHStack): r""" Stack functionals horizontally. Consider a collection :math:`\{f_i:\mathbb{R}^{N_i}\to \mathbb{R}, i=1,\ldots, k\}` of functionals. Their horizontal stacking is defined as the operator .. math:: h:\begin{cases}\mathbb{R}^{N_1}\times \cdots \times\mathbb{R}^{N_k}\to \mathbb{R}\\ (\mathbf{x}_1,\ldots, \mathbf{x}_k)\mapsto \sum_{i=1}^k f_i(\mathbf{x}_i). \end{cases} The proximity operator of :math:`h` is moreover given by ([ProxAlg]_ Section 2.1): .. math:: \mathbf{\text{prox}}_{\tau h}(\mathbf{x}_1,\ldots, \mathbf{x}_k)=\left(\mathbf{\text{prox}}_{\tau f_1}(\mathbf{x}_1),\ldots, \mathbf{\text{prox}}_{\tau f_k}(\mathbf{x}_k)\right). Examples -------- .. testsetup:: import numpy as np from pycsou.func.base import ProxFuncHStack .. doctest:: >>> from pycsou.func.penalty import L1Norm, SquaredL1Norm >>> func1 = L1Norm(dim=10) >>> func2 = SquaredL1Norm(dim=10) >>> func = ProxFuncHStack(func1, func2) >>> x = np.arange(10); y= x/2; tau=0.1 >>> np.allclose(func.prox(np.concatenate((x,y)), tau), np.concatenate((func1.prox(x, tau), func2.prox(y, tau)))) True >>> parfunc=ProxFuncHStack(func1, func2, n_jobs=-1) >>> np.allclose(func.prox(np.concatenate((x,y)), tau),parfunc.prox(np.concatenate((x,y)), tau)) True """
[docs] def __init__(self, *proxfuncs, n_jobs: int = 1, joblib_backend: str = 'loky'): r""" Parameters ---------- proxfuncs: ProximableFunctional List of proximable functionals to stack. n_jobs: int Number of cores to be used for parallel evaluation of the proximal operator. If ``n_jobs==1``, the proximal operator of the functional stack is evaluated sequentially, otherwise it is evaluated in parallel. Setting ``n_jobs=-1`` uses all available cores. joblib_backend: str Joblib backend (`more details here <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html>`_). """ MapHStack.__init__(self, *proxfuncs, n_jobs=n_jobs, joblib_backend=joblib_backend) self.proxfuncs = self.maps ProximableFunctional.__init__(self, dim=self.shape[1], data=None, is_differentiable=self.is_differentiable, is_linear=self.is_linear)
[docs] def prox(self, x: Union[Number, np.ndarray], tau: Number) -> Union[Number, np.ndarray]: x_split = np.split(x, self.sections) if self.n_jobs == 1: result = [func.prox(x_split[i], tau) for i, func in enumerate(self.proxfuncs)] else: with job.Parallel(backend=self.joblib_backend, n_jobs=self.n_jobs, verbose=False) as parallel: result = parallel(job.delayed(func.prox) (x_split[i], tau) for i, func in enumerate(self.proxfuncs)) return np.concatenate(result, axis=0)
[docs]class DiffFuncHStack(DifferentiableFunctional, DiffMapHStack):
[docs] def __init__(self, *difffuncs, n_jobs: int = 1, joblib_backend: str = 'loky'): r""" Parameters ---------- difffuncs: DifferentiableFunctional List of differentiable functionals to stack. n_jobs: int Number of cores to be used for parallel evaluation of the functional stack and its gradient. If ``n_jobs==1``, the proximal operator of the functional stack and its gradient are evaluated sequentially, otherwise they are evaluated in parallel. Setting ``n_jobs=-1`` uses all available cores. joblib_backend: str Joblib backend (`more details here <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html>`_). """ DiffMapHStack.__init__(self, *difffuncs, n_jobs=n_jobs, joblib_backend=joblib_backend) self.difffuncs = self.maps DifferentiableFunctional.__init__(self, dim=self.shape[1], data=None, is_linear=self.is_linear, lipschitz_cst=self.lipschitz_cst, diff_lipschitz_cst=self.diff_lipschitz_cst)
[docs] def jacobianT(self, arg: Union[Number, np.ndarray]) -> Union[Number, np.ndarray]: arg_split = np.split(arg, self.sections) if self.n_jobs == 1: jacobianT_list = [difffunc.jacobianT(arg_split[i]) for i, difffunc in enumerate(self.maps)] else: with job.Parallel(backend=self.joblib_backend, n_jobs=self.n_jobs, verbose=False) as parallel: jacobianT_list = parallel(job.delayed(difffunc.jacobianT)(arg_split[i]) for i, difffunc in enumerate(self.maps)) result = np.concatenate(jacobianT_list, axis=0) return result
[docs]class ExplicitLinearFunctional(LinearFunctional): r""" Base class for linear functionals. """
[docs] def __init__(self, vec: np.ndarray, dtype: type = np.float64): self.vec = vec.flatten().astype(dtype) super(ExplicitLinearFunctional, self).__init__(dim=vec.size, dtype=dtype, is_explicit=True)
def __call__(self, x: np.ndarray) -> np.ndarray: return np.dot(self.vec, x.flatten())
[docs] def adjoint(self, y: Number) -> Number: return y * self.vec
[docs]class IndicatorFunctional(ProximableFunctional): r""" Base class for indicator functionals. """
[docs] def __init__(self, dim: int, condition_func: Callable, projection_func: Callable): r""" Parameters ---------- dim: int Dimension of the functional's domain. condition_func: Callable Condition delimiting the domain of the indicator functional. projection_func: Callable Projecton onto the domain of the indicator functional. See Also -------- :py:func:`~pycsou.math.prox.proj_nonnegative_orthant`, :py:func:`~pycsou.math.prox.proj_segment` """ super(IndicatorFunctional, self).__init__(dim=dim, data=None, is_differentiable=False, is_linear=False) self.condition_func = condition_func self.projection_func = projection_func
def __call__(self, x: Union[Number, np.ndarray], **kwargs) -> Number: return 0 if self.condition_func(x, **kwargs) else np.infty
[docs] def prox(self, x: Union[Number, np.ndarray], tau: Number, **kwargs) -> Union[Number, np.ndarray]: return self.projection_func(x, **kwargs)
[docs]class NullDifferentiableFunctional(DifferentiableFunctional): r""" Null differentiable functional. """
[docs] def __init__(self, dim: int): r""" Parameters ---------- dim: int Dimension of the functional's domain. """ super(NullDifferentiableFunctional, self).__init__(dim=dim, is_linear=True, lipschitz_cst=0, diff_lipschitz_cst=0)
def __call__(self, x: Union[Number, np.ndarray]) -> Number: return 0
[docs] def jacobianT(self, arg: Union[Number, np.ndarray]) -> Union[Number, np.ndarray]: return np.zeros(shape=self.dim)
[docs]class NullProximableFunctional(ProximableFunctional): r""" Null proximable functional. """
[docs] def __init__(self, dim: int): r""" Parameters ---------- dim: int Dimension of the functional's domain. """ super(NullProximableFunctional, self).__init__(dim=dim, is_linear=True)
def __call__(self, x: Union[Number, np.ndarray]) -> Number: return 0
[docs] def prox(self, x: Union[Number, np.ndarray], tau: Number) -> Union[Number, np.ndarray]: return x
[docs]class LpNorm(ProximableFunctional): r""" Base class for :math:`\ell_p`-norms. Proximity operators of :math:`\ell_p`-norms are computed via Moreau's identity and the knowledge of the projection onto the conjugate :math:`\ell_p`-ball with :math:`1/p+1/q=1.` """
[docs] def __init__(self, dim: int, proj_lq_ball: Callable): r""" Parameters ---------- dim: int Dimension of the functional's domain. proj_lq_ball: Callable[x: np.ndarray, radius: float] Projection onto the :math:`\ell_q`-ball where :math:`1/p+1/q=1.` See Also -------- :py:func:`~pycsou.math.prox.proj_l2_ball`, :py:func:`~pycsou.math.prox.proj_l1_ball`, :py:func:`~pycsou.math.prox.proj_linfty_ball` """ super(LpNorm, self).__init__(dim=dim, data=None, is_differentiable=False, is_linear=False) self.proj_lq_ball = proj_lq_ball
[docs] def prox(self, x: Union[Number, np.ndarray], tau: Number) -> Union[Number, np.ndarray]: return x - tau * self.proj_lq_ball(x / tau, radius=1)