Source code for pycsou.core.map

# #############################################################################
# map.py
# ======
# Author : Matthieu Simeoni [matthieu.simeoni@gmail.com]
# #############################################################################

r"""
Abstract classes for multidimensional nonlinear maps.
"""

import numpy as np
import joblib as job

from abc import ABC, abstractmethod
from typing import Union, Tuple
from numbers import Number
from pycsou.util.misc import is_range_broadcastable, range_broadcast_shape


[docs]class Map(ABC): r""" Base class for multidimensional maps. Any instance/subclass of this class must at least implement the abstract method ``__call__``. This class supports the following arithmetic operators ``+``, ``-``, ``*``, ``@``, ``**`` and ``/``, implemented with the class methods ``__add__``/``__radd__``, ``__sub__``/``__neg__``, ``__mul__``/``__rmul__``, ``__matmul__``, ``__pow__``, ``__truediv__``. Such arithmetic operators can be used to *add*, *substract*, *scale*, *compose*, *exponentiate* or *evaluate* ``Map`` instances. Examples -------- .. testsetup:: import numpy as np from pycsou.func.penalty import L1Norm, SquaredL2Norm from pycsou.linop.conv import Convolve1D from scipy import signal x = np.arange(10) filter = signal.hann(5) filter[filter.size//2:] = 0 f1 = L1Norm(dim=x.size) f2 = SquaredL2Norm(dim=x.size) L1 = Convolve1D(size=x.size, filter=filter) L2 = Convolve1D(size=x.size, filter=filter/2) Consider four maps: two nonlinear functionals :math:`f_1:\mathbb{R}^{10}\to \mathbb{R}`, :math:`f_2:\mathbb{R}^{10}\to \mathbb{R}` and two linear operators :math:`L_1:\mathbb{R}^{10}\to \mathbb{R}^{10}`, :math:`L_2:\mathbb{R}^{10}\to \mathbb{R}^{10}`. .. doctest:: >>> print(f1.shape, f2.shape, L1.shape, L2.shape) (1, 10) (1, 10) (10, 10) (10, 10) We can combine linearly/compose the maps with consistent domains/ranges: .. doctest:: >>> f3 = f1 / 3 + np.pi * f2 >>> np.allclose(f3(x), f1(x) / 3 + np.pi * f2(x)) True >>> L3 = L1 * 3 - (L2 ** 2) / 6 >>> np.allclose(L3(x), L1(x) * 3 - (L2(L2(x))) / 6) True >>> f4 = f3 * L3 >>> np.allclose(f4(x), f3(L3(x))) True Note that multiplying a map with an array is the same as evaluating the map at the array. .. doctest:: >>> np.allclose(f3 * x, f3(x)) True The multiplication operator ``@`` can also be used in place of ``*``, in compliance with Numpy's interface: .. doctest:: >>> np.allclose(f3 * x, f3 @ x) True >>> np.allclose((f1 * L1)(x), (f1 @ L1)(x)) True Finally, maps can be shifted via the method ``shifter``: .. doctest:: >>> f5=f4.shifter(shift=2 * x) >>> np.allclose(f5(x), f4(x + 2 * x)) True """
[docs] def __init__(self, shape: Tuple[int, int], is_linear: bool = False, is_differentiable: bool = False): r""" Parameters ---------- shape: Tuple[int, int] Shape of the map. is_linear: bool Whether the map is linear or not. is_differentiable: bool Whether the map is differentiable or not. """ if len(shape) > 2: raise NotImplementedError( 'Shapes of map objects must be tuples of length 2 (tensorial maps not supported).') self.shape = shape self.is_linear = is_linear self.is_functional = True if self.shape[0] == 1 else False self.is_differentiable = is_differentiable super(Map, self).__init__()
[docs] @abstractmethod def __call__(self, arg: Union[Number, np.ndarray]) -> Union[Number, np.ndarray]: r""" Call self as a function. Parameters ---------- arg: Union[Number, np.ndarray] Argument of the map. Returns ------- Union[Number, np.ndarray] Value of ``arg`` through the map. """ pass
[docs] def apply_along_axis(self, arr: np.ndarray, axis: int = 0) -> np.ndarray: r""" Apply the map to 1-D slices along the given axis. Parameters ---------- arr: np.ndarray Input array. axis: int Axis along which ``arr`` is sliced. Returns ------- np.ndarray The output array. The shape of the latter is identical to the shape of ``arr``, except along the specified axis dimension. This axis is removed, and replaced with new dimensions equal to ``self.shape[0]``. If ``self.shape[0]==1`` the output array will have one fewer dimensions than ``arr``. Raises ------ ValueError If ``arr.shape[axis] != self.shape[1]``. Examples -------- .. doctest:: >>> from pycsou.linop import DownSampling >>> from pycsou.func import SquaredL2Norm >>> D=DownSampling(size=20, downsampling_factor=2) >>> arr=np.arange(200).reshape(20,2,5) >>> out = D.apply_along_axis(arr, axis=0) >>> out.shape (10, 2, 5) >>> ff = SquaredL2Norm(dim=2) >>> out = ff.apply_along_axis(arr, axis=1) >>> out.shape (20, 5) """ if arr.shape[axis] != self.shape[1]: raise ValueError( f"Array size along specified axis and the map domain's dimension differ: {arr.shape[axis]} != {self.shape[1]}.") return np.apply_along_axis(func1d=self.__call__, axis=axis, arr=arr)
[docs] def shifter(self, shift: Union[Number, np.ndarray]) -> 'MapShifted': r""" Returns a shifted version of the map. Parameters ---------- shift: Union[Number, np.ndarray] Shift vector. Returns ------- :py:class:`~pycsou.core.map.MapShifted` Shifted map. Notes ----- Let ``A`` be a ``Map`` instance and ``B=A.shifter(y)`` with ``y`` some vector in the domain of ``A``. Then we have: ``B(x)=A(x+y)``. """ return MapShifted(map=self, shift=shift)
[docs] def __add__(self, other: 'Map') -> 'MapSum': r""" Add a map with another map instance. Parameters ---------- other: Union['Map', np.ndarray] The other map or array to be added. Returns ------- :py:class:`~pycsou.core.map.MapSum` Sum of the map with another map. Raises ------ NotImplementedError If ``other`` is not a map. """ # if isinstance(other, np.ndarray): # return MapBias(self, bias=other) if isinstance(other, Map): return MapSum(self, other) else: raise NotImplementedError
def __radd__(self, other: 'Map') -> 'MapSum': r""" Add a map with another map instance when the latter is on the right hand side of the summation. Parameters ---------- other: Union['Map', np.ndarray] The other map or array to be added. Returns ------- :py:class:`~pycsou.core.map.MapSum` Sum of the map with another map. Raises ------ NotImplementedError If ``other`` is not a map. """ # if isinstance(other, np.ndarray): # return MapBias(self, bias=other) if isinstance(other, Map): return MapSum(other, self) else: raise NotImplementedError
[docs] def __mul__(self, other: Union[Number, 'Map', np.ndarray]) -> Union['MapComp', np.ndarray]: r""" Multiply a map with another map, a scalar, or an array. The behaviour of this method depends on the type of ``other``: * If ``other`` is another map, then it returns the composition of the map with ``other``. * If ``other`` is an array with compatible shape, then it calls the map on ``other``. * If ``other`` is a scalar, then it multiplies the map with this scalar. Parameters ---------- other: Union[Number, 'Map', np.ndarray] Scalar, map or array with which to multiply. Returns ------- :py:class:`~pycsou.core.map.MapComp`, np.ndarray Composition of the map with another map, or product with a scalar or array. Raises ------ NotImplementedError If ``other`` is not a scalar, a map or an array. See Also -------- :py:func:`~pycsou.core.map.Map.__matmul__`, :py:func:`~pycsou.core.map.Map.__rmul__` """ if isinstance(other, Number): from pycsou.linop.base import HomothetyMap other = HomothetyMap(constant=other, size=self.shape[1]) if isinstance(other, np.ndarray): return self(other) elif isinstance(other, Map): return MapComp(self, other) else: raise NotImplementedError
def __rmul__(self, other: Union[Number, 'Map']) -> 'MapComp': if isinstance(other, Number): from pycsou.linop.base import HomothetyMap other = HomothetyMap(constant=other, size=self.shape[0]) if isinstance(other, Map): return MapComp(other, self) else: raise NotImplementedError
[docs] def __matmul__(self, other: 'Map') -> 'MapComp': r"""Alias for :py:func:`~pycsou.core.map.Map.__mul__` offered to comply with Numpy's interface.""" return self.__mul__(other)
[docs] def __neg__(self) -> 'MapComp': r"""Negates a map. Alias for ``self.__mul__(-1)``.""" return self.__mul__(-1)
[docs] def __sub__(self, other: Union['Map', np.ndarray]) -> Union['MapSum', 'MapBias']: r"""Substracts a map, scalar, or array to another map. Alias for ``self.__add__(other.__neg__())``.""" other = other.__neg__() return self.__add__(other)
[docs] def __pow__(self, power: int) -> 'MapComp': r"""Raise a map to a certain ``power``. Alias for ``A*A*...*A`` with ``power`` multiplications.""" if type(power) is int: exp_map = self for i in range(1, power): exp_map = self.__mul__(exp_map) return exp_map else: raise NotImplementedError
[docs] def __truediv__(self, scalar: Number) -> 'MapComp': r"""Divides a map by a ``scalar``. Alias for ``self.__mul__(1 / scalar)``.""" if isinstance(scalar, Number): return self.__mul__(1 / scalar) else: raise NotImplementedError
class MapShifted(Map): def __init__(self, map: Map, shift: Union[Number, np.ndarray]): self.map = map self.shift = shift if shift.size != map.shape[1]: raise TypeError('Invalid shift size.') Map.__init__(self, shape=map.shape, is_linear=map.is_linear, is_differentiable=map.is_differentiable) def __call__(self, arg: Union[Number, np.ndarray]) -> Union[Number, np.ndarray]: return self.map(arg + self.shift) class MapSum(Map): def __init__(self, map1: Map, map2: Map): if not is_range_broadcastable(map1.shape, map2.shape): raise ValueError('Cannot sum two maps with inconsistent range or domain sizes.') else: Map.__init__(self, shape=range_broadcast_shape(map1.shape, map2.shape), is_linear=map1.is_linear & map2.is_linear, is_differentiable=map1.is_differentiable & map2.is_differentiable) self.map1, self.map2 = map1, map2 def __call__(self, arg: Union[Number, np.ndarray]) -> Union[Number, np.ndarray]: return self.map1(arg) + self.map2(arg) # class MapBias(Map): # def __init__(self, map_: Map, bias: np.ndarray): # if not is_range_broadcastable(map_.shape, bias.shape): # raise ValueError('Inconsistent range sizes between map and bias.') # else: # super(MapBias, self).__init__(shape=map_.shape, is_linear=False, is_differentiable=map_.is_differentiable) # self.map, self.bias = map_, bias # # def __call__(self, arg: Union[Number, np.ndarray]) -> Union[Number, np.ndarray]: # return self.map(arg) + self.bias class MapComp(Map): def __init__(self, map1: Map, map2: Map): if map1.shape[1] != map2.shape[0] and map2.shape[0] != 1 and map1.shape[1] != 1: raise ValueError('Cannot compose two maps with inconsistent range or domain sizes.') else: Map.__init__(self, shape=(map1.shape[0], map2.shape[1]), is_linear=map1.is_linear & map2.is_linear, is_differentiable=map1.is_differentiable & map2.is_differentiable) self.map1 = map1 self.map2 = map2 def __call__(self, arg: Union[Number, np.ndarray]) -> Union[Number, np.ndarray]: return self.map1(self.map2(arg))
[docs]class DifferentiableMap(Map): r""" Base class for multidimensional differentiable maps. Any instance/subclass of this class must at least implement the abstract methods ``__call__`` and ``jacobianT``. The attributes ``lipschitz_cst``, ``diff_lipschitz_cst`` and the method ``jacobianT`` are automatically updated using standard differentiation rules when scaling, composing , summing or shifting differentiable maps. Examples -------- .. testsetup:: import numpy as np from pycsou.func.penalty import SquaredL2Norm from pycsou.linop.diff import FirstDerivative x = np.arange(10) f = SquaredL2Norm(dim=x.size) L1 = FirstDerivative(size=x.size) L1.compute_lipschitz_cst() L2 = FirstDerivative(size=x.size, kind='centered') L2.compute_lipschitz_cst() Consider three differentiable maps: a nonlinear differentiable functional :math:`f:\mathbb{R}^{10}\to \mathbb{R}`, and two linear operators :math:`L_1:\mathbb{R}^{10}\to \mathbb{R}^{10}`, :math:`L_2:\mathbb{R}^{10}\to \mathbb{R}^{10}`. .. doctest:: >>> print(f.lipschitz_cst, f.diff_lipschitz_cst) inf 2 >>> print(np.round(L1.lipschitz_cst,1), np.round(L1.diff_lipschitz_cst,1)) 2.0 2.0 >>> print(np.round(L2.lipschitz_cst,1), np.round(L2.diff_lipschitz_cst,1)) 1.5 1.5 >>> L3 = 2 * L1 + (L2 ** 2) / 3 >>> np.allclose(L3.lipschitz_cst, 2 * L1.lipschitz_cst + (L2.lipschitz_cst ** 2)/3) True >>> map_ = f * L1 >>> np.allclose(map_.lipschitz_cst, f.lipschitz_cst * L1.lipschitz_cst) True >>> np.allclose(map_.jacobianT(x), L1.jacobianT(x) * f.jacobianT(L1(x))) True Notes ----- This class supports the following arithmetic operators ``+``, ``-``, ``*``, ``@``, ``**`` and ``/``, implemented with the class methods ``__add__``/``__radd__``, ``__sub__``/``__neg__``, ``__mul__``/``__rmul__``, ``__matmul__``, ``__pow__``, ``__truediv__``. Such arithmetic operators can be used to *add*, *substract*, *scale*, *compose*, *exponentiate* or *evaluate* ``DifferentiableMap`` instances. """
[docs] def __init__(self, shape: Tuple[int, int], is_linear: bool = False, lipschitz_cst: float = np.infty, diff_lipschitz_cst: float = np.infty): r""" Parameters ---------- shape: Tuple[int, int] Shape of the differentiable map. is_linear: bool Whether the differentiable map is linear or not. lipschitz_cst: float Lispchitz constant of the differentiable map if it exists/is known. Default to :math:`+\infty`. diff_lipschitz_cst: float Lispchitz constant of the derivative of the differentiable map if it exists/is known. Default to :math:`+\infty`. """ super(DifferentiableMap, self).__init__(shape=shape, is_linear=is_linear, is_differentiable=True) self.lipschitz_cst = lipschitz_cst self.diff_lipschitz_cst = diff_lipschitz_cst
[docs] @abstractmethod def jacobianT(self, arg: Union[Number, np.ndarray]) -> 'LinearOperator': r""" Transpose of the Jacobian matrix of the differentiable map evaluated at ``arg``. Parameters ---------- arg: Union[Number, np.ndarray] Point at which the transposed Jacobian matrix is evaluated. Returns ------- ::py:class:`~pycsou.core.linop.LinearOperator` Linear operator associated to the transposed Jacobian matrix. """ pass
[docs] def gradient(self, arg: Union[Number, np.ndarray]) -> 'LinearOperator': r""" Alias for ``self.jacobianT``. """ return self.jacobianT(arg)
[docs] def compute_lipschitz_cst(self): r""" User-implemented method to compute the Lipschitz constant of the map. """ pass
[docs] def compute_diff_lipschitz_cst(self): r""" User-implemented method to compute the Lipschitz constant of the derivative of the map. """ pass
[docs] def shifter(self, shift: Union[Number, np.ndarray]) -> 'DiffMapShifted': r""" Returns a shifted version of the map. Parameters ---------- shift: Union[Number, np.ndarray] Shift vector. Returns ------- :py:class:`~pycsou.core.map.DiffMapShifted` Shifted map. See Also -------- :py:meth:`~pycsou.core.map.Map.shifter` """ return DiffMapShifted(map=self, shift=shift)
def __add__(self, other: Union['Map', 'DifferentiableMap']) -> Union['MapSum', 'DiffMapSum']: # if isinstance(other, np.ndarray): # return DiffMapBias(self, other) if isinstance(other, DifferentiableMap): return DiffMapSum(self, other) elif isinstance(other, Map): return MapSum(self, other) else: raise NotImplementedError def __radd__(self, other: Union['Map', 'DifferentiableMap']) -> Union['MapSum', 'DiffMapSum']: # if isinstance(other, np.ndarray): # return DiffMapBias(self, other) if isinstance(other, DifferentiableMap): return DiffMapSum(self, other) elif isinstance(other, Map): return MapSum(self, other) else: raise NotImplementedError def __mul__(self, other: Union[Number, 'Map', 'DifferentiableMap', np.ndarray]) \ -> Union['MapComp', 'DiffMapComp', np.ndarray]: if isinstance(other, Number): from pycsou.linop.base import HomothetyMap other = HomothetyMap(constant=other, size=self.shape[1]) if isinstance(other, np.ndarray): return self(other) elif isinstance(other, DifferentiableMap): return DiffMapComp(self, other) elif isinstance(other, Map): return MapComp(self, other) else: raise NotImplementedError def __rmul__(self, other: Union[Number, 'Map', 'DifferentiableMap']) -> Union['MapComp', 'DiffMapComp']: if isinstance(other, Number): from pycsou.linop.base import HomothetyMap other = HomothetyMap(constant=other, size=self.shape[0]) if isinstance(other, DifferentiableMap): return DiffMapComp(other, self) elif isinstance(other, Map): return MapComp(other, self) else: raise NotImplementedError
class DiffMapShifted(MapShifted, DifferentiableMap): def __init__(self, map: DifferentiableMap, shift: Union[Number, np.ndarray]): MapShifted.__init__(self, map=map, shift=shift) DifferentiableMap.__init__(self, shape=self.shape, is_linear=self.is_linear, lipschitz_cst=self.map.lipschitz_cst, diff_lipschitz_cst=self.map.diff_lipschitz_cst) def jacobianT(self, arg: Union[Number, np.ndarray]) -> 'LinearOperator': return self.map.jacobianT(arg + self.shift) class DiffMapSum(MapSum, DifferentiableMap): def __init__(self, map1: DifferentiableMap, map2: DifferentiableMap): MapSum.__init__(self, map1=map1, map2=map2) DifferentiableMap.__init__(self, shape=self.shape, is_linear=self.is_linear, lipschitz_cst=self.map1.lipschitz_cst + self.map2.lipschitz_cst, diff_lipschitz_cst=self.map1.diff_lipschitz_cst + self.map2.diff_lipschitz_cst) def jacobianT(self, arg: Union[Number, np.ndarray]) -> 'LinearOperator': return self.map1.jacobianT(arg) + self.map2.jacobianT(arg) # class DiffMapBias(MapBias, DifferentiableMap): # def __init__(self, map_: DifferentiableMap, bias: np.ndarray): # MapBias.__init__(self, map_=map_, bias=bias) # DifferentiableMap.__init__(self, shape=self.shape, is_linear=False, lipschitz_cst=self.map.lipschitz_cst, # diff_lipschitz_cst=self.map.diff_lipschitz_cst) # # def jacobianT(self, arg: Union[Number, np.ndarray]) -> 'LinearOperator': # return self.map.jacobianT(arg) * self.map2.jacobianT(arg) * self.map1.jacobianT(self.map2(arg)) class DiffMapComp(MapComp, DifferentiableMap): def __init__(self, map1: DifferentiableMap, map2: DifferentiableMap): from pycsou.linop.base import HomothetyMap MapComp.__init__(self, map1=map1, map2=map2) lipschitz_cst = self.map2.lipschitz_cst * self.map1.lipschitz_cst if isinstance(map1, HomothetyMap): diff_lipschitz_cst = self.map1.diff_lipschitz_cst * self.map2.diff_lipschitz_cst else: diff_lipschitz_cst = self.map1.diff_lipschitz_cst * self.map2.diff_lipschitz_cst * self.map2.lipschitz_cst DifferentiableMap.__init__(self, shape=self.shape, is_linear=self.is_linear, lipschitz_cst=lipschitz_cst, diff_lipschitz_cst=diff_lipschitz_cst) def jacobianT(self, arg: Union[Number, np.ndarray]) -> 'LinearOperator': return self.map2.jacobianT(arg) * self.map1.jacobianT(self.map2(arg))
[docs]class MapStack(Map): r""" Stack maps together. This class constructs a map by stacking multiple maps together, either **vertically** (``axis=0``) or **horizontally** (``axis=1``): - **Vertical stacking**: Consider a collection :math:`\{L_i:\mathbb{R}^{N}\to \mathbb{R}^{M_i}, i=1,\ldots, k\}` of maps. Their vertical stacking is defined as the operator .. math:: V:\begin{cases}\mathbb{R}^{N}\to \mathbb{R}^{M_1}\times \cdots \times\mathbb{R}^{M_k}\\ \mathbf{x}\mapsto (L_1\mathbf{x},\ldots, L_k\mathbf{x}). \end{cases} - **Horizontal stacking**: Consider a collection :math:`\{L_i:\mathbb{R}^{N_i}\to \mathbb{R}^{M}, i=1,\ldots, k\}` of maps. Their horizontal stacking is defined as the operator .. math:: H:\begin{cases}\mathbb{R}^{N_1}\times \cdots \times\mathbb{R}^{N_k}\to \mathbb{R}^{M}\\ (\mathbf{x}_1,\ldots, \mathbf{x}_k)\mapsto \sum_{i=1}^k L_i \mathbf{x}_i. \end{cases} Examples -------- .. testsetup:: import numpy as np from pycsou.func.penalty import SquaredL2Norm from pycsou.linop.diff import FirstDerivative from pycsou.core.map import MapStack x = np.arange(10) y = np.arange(20) f = SquaredL2Norm(dim=x.size) g = SquaredL2Norm(dim=y.size) A1 = FirstDerivative(size=x.size) A2 = FirstDerivative(size=y.size, kind='centered') A3 = A1 / 2 K1 = f * A1 K2 = g * A2 K3 = A3 Consider three maps :math:`K_1:\mathbb{R}^{10}\to \mathbb{R}`, :math:`K_2:\mathbb{R}^{20}\to \mathbb{R}` and :math:`K_3:\mathbb{R}^{10}\to \mathbb{R}^{10}` .. doctest:: >>> V = MapStack(K1, K3, axis=0) >>> V.shape (11, 10) >>> np.allclose(V(x), np.concatenate((K1(x).flatten(), K3(x).flatten()))) True >>> parV = MapStack(K1, K3, axis=0, n_jobs=-1) >>> np.allclose(V(x), parV(x)) True >>> H = MapStack(K1,K2, axis=1) >>> H.shape (1, 30) >>> np.allclose(H(np.concatenate((x,y))), K1(x) + K2(y)) True >>> parH = MapStack(K1,K2, axis=1, n_jobs=-1) >>> np.allclose(H(np.concatenate((x,y))), parH(np.concatenate((x,y)))) True See Also -------- :py:class:`~pycsou.core.map.MapVStack`, :py:class:`~pycsou.core.map.MapHStack`, :py:class:`~pycsou.core.map.DiffMapStack` """
[docs] def __init__(self, *maps: Map, axis: int, n_jobs: int = 1, joblib_backend: str = 'loky'): r""" Parameters ---------- maps: Map List of maps to stack. axis: Stacking direction: 0 for vertical and 1 for horizontal stacking. n_jobs: int Number of cores to be used for parallel evaluation of the map stack. If ``n_jobs==1``, the map stack is evaluated sequentially, otherwise it is evaluated in parallel. Setting ``n_jobs=-1`` uses all available cores. joblib_backend: str Joblib backend (`more details here <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html>`_). """ self.maps = list(maps) if (np.abs(axis) > 1): ValueError('Axis must be one of {0, 1,-1}.') self.axis = int(axis) self.is_linear_list = [map_.is_linear for map_ in self.maps] self.is_differentiable_list = [map_.is_differentiable for map_ in self.maps] self.shapes = np.array([map_.shape for map_ in self.maps]) self.block_sizes = [map_.shape[axis] for map_ in self.maps] self.sections = np.cumsum(self.block_sizes) self.n_jobs = n_jobs self.joblib_backend = joblib_backend if not self.is_valid_stack(): raise ValueError('Inconsistent map shapes for stacking.') Map.__init__(self, shape=self.get_shape(), is_linear=bool(np.prod(self.is_linear_list).astype(bool)), is_differentiable=bool(np.prod(self.is_differentiable_list).astype(bool)))
def __call__(self, x: Union[Number, np.ndarray]) -> Union[Number, np.ndarray]: if self.axis == 0: if self.n_jobs == 1: out_list = [map_.__call__(x).flatten() for map_ in self.maps] else: with job.Parallel(backend=self.joblib_backend, n_jobs=self.n_jobs, verbose=False) as parallel: out_list = parallel(job.delayed(map_.__call__)(x) for map_ in self.maps) out_list = [y.flatten() for y in out_list] return np.concatenate(out_list, axis=0) else: x_split = np.split(x, self.sections) if self.n_jobs == 1: result = 0 for i, map_ in enumerate(self.maps): result += map_.__call__(x_split[i]) else: with job.Parallel(backend=self.joblib_backend, n_jobs=self.n_jobs, verbose=False) as parallel: out_list = parallel(job.delayed(map_.__call__)(x_split[i]) for i, map_ in enumerate(self.maps)) result = np.sum(np.stack(out_list, axis=0), axis=0) return result def is_valid_stack(self) -> bool: col_sizes = [map_.shape[1 - self.axis] for map_ in self.maps] return np.unique(col_sizes).size == 1 def get_shape(self) -> Tuple[int, int]: sizes = [map_.shape[self.axis] for map_ in self.maps] if self.axis == 0: return (int(np.sum(sizes).astype(int)), self.maps[0].shape[1 - self.axis]) else: return (self.maps[0].shape[1 - self.axis], int(np.sum(sizes).astype(int)))
[docs]class MapVStack(MapStack): r""" Alias for vertical stacking, equivalent to ``MapStack(*maps, axis=0)``. Examples -------- .. testsetup:: from pycsou.core.map import MapVStack .. doctest:: >>> V1 = MapStack(K1, K3, axis=0) >>> V2 = MapVStack(K1, K3) >>> np.allclose(V1(x), V2(x)) True """
[docs] def __init__(self, *maps: Map, n_jobs: int = 1, joblib_backend: str = 'loky'): r""" Parameters ---------- maps: Map List of maps to stack. n_jobs: int Number of cores to be used for parallel evaluation of the map stack. If ``n_jobs==1``, the map stack is evaluated sequentially, otherwise it is evaluated in parallel. Setting ``n_jobs=-1`` uses all available cores. joblib_backend: str Joblib backend (`more details here <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html>`_). """ super(MapVStack, self).__init__(*maps, axis=0, n_jobs=n_jobs, joblib_backend=joblib_backend)
[docs]class MapHStack(MapStack): r""" Alias for horizontal stacking, equivalent to ``MapStack(*maps, axis=1)``. Examples -------- .. testsetup:: from pycsou.core.map import MapHStack .. doctest:: >>> V1 = MapStack(K1, K2, axis=1) >>> V2 = MapHStack(K1, K2) >>> np.allclose(V1(np.arange(V1.shape[1])), V2(np.arange(V1.shape[1]))) True """
[docs] def __init__(self, *maps: Map, n_jobs: int = 1, joblib_backend: str = 'loky'): r""" Parameters ---------- maps: Map List of maps to stack. n_jobs: int Number of cores to be used for parallel evaluation of the map stack. If ``n_jobs==1``, the map stack is evaluated sequentially, otherwise it is evaluated in parallel. Setting ``n_jobs=-1`` uses all available cores. joblib_backend: str Joblib backend (`more details here <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html>`_). """ super(MapHStack, self).__init__(*maps, axis=1, n_jobs=n_jobs, joblib_backend=joblib_backend)
[docs]class DiffMapStack(MapStack, DifferentiableMap): r""" Stack differentiable maps together. This class constructs a differentiable map by stacking multiple maps together, either **vertically** (``axis=0``) or **horizontally** (``axis=1``). The attributes ``lipschitz_cst``, ``diff_lipschitz_cst`` and the method ``jacobianT`` are inferred from those of the stacked maps. - **Vertical stacking**: Consider a collection :math:`\{L_i:\mathbb{R}^{N}\to \mathbb{R}^{M_i}, i=1,\ldots, k\}` of maps, with Lipschitz constants :math:`\{\beta_i\in\mathbb{R}_+\cup\{+\infty\}, i=1,\ldots, k\}`, Jacobian matrices :math:`\{\mathbf{J}_i(\mathbf{x})\in\mathbb{R}^{M_i\times N}, i=1,\ldots, k\}` for some :math:`\mathbf{x}\in\mathbb{R}^N`. Then the vertically stacked operator .. math:: V:\begin{cases}\mathbb{R}^{N}\to \mathbb{R}^{M_1}\times \cdots \times\mathbb{R}^{M_k}\\ \mathbf{x}\mapsto (L_1\mathbf{x},\ldots, L_k\mathbf{x}). \end{cases} has a Lipschitz constant bounded by :math:`\sqrt{\sum_{i=1}^k \beta_i^2}`. Moreover the Jacobian matrix of :math:`V` is obtained by stacking the individual Jacobian matrices vertically: .. math:: \mathbf{J}(\mathbf{x})=\left[\begin{array}{c}\mathbf{J}_1(\mathbf{x})\\\vdots\\ \mathbf{J}_k(\mathbf{x}) \end{array}\right]\in\mathbb{R}^{(\sum_{i=1}^k M_i)\times N}. - **Horizontal stacking**: Consider a collection :math:`\{L_i:\mathbb{R}^{N_i}\to \mathbb{R}^{M}, i=1,\ldots, k\}` of maps, with Lipschitz constants :math:`\{\beta_i\in\mathbb{R}_+\cup\{+\infty\}, i=1,\ldots, k\}`, Jacobian matrices :math:`\{\mathbf{J}_i(\mathbf{x}_i)\in\mathbb{R}^{M\times N_i}, i=1,\ldots, k\}` for some :math:`\mathbf{x}_i\in\mathbb{R}^{N_i}`. Then the horizontally stacked operator .. math:: H:\begin{cases}\mathbb{R}^{N_1}\times \cdots \times\mathbb{R}^{N_k}\to \mathbb{R}^{M}\\ (\mathbf{x}_1,\ldots, \mathbf{x}_k)\mapsto \sum_{i=1}^k L_i \mathbf{x}_i. \end{cases} has a Lipschitz constant bounded by :math:`\max_{i=1}^k \beta_i`. Moreover the Jacobian matrix of :math:`H` is obtained by stacking the individual Jacobian matrices horizontally: .. math:: \mathbf{J}(\mathbf{x}_1,\ldots,\mathbf{x}_k)=\left[\begin{array}{c}\mathbf{J}_1(\mathbf{x}_1)&\cdots& \mathbf{J}_k(\mathbf{x}_k) \end{array}\right]\in\mathbb{R}^{M\times (\sum_{i=1}^k N_i)}. Examples -------- .. testsetup:: import numpy as np from pycsou.func.penalty import SquaredL2Norm from pycsou.linop.diff import FirstDerivative from pycsou.core.map import MapStack, DiffMapStack x = np.arange(10) y = np.arange(20) f = SquaredL2Norm(dim=x.size) g = SquaredL2Norm(dim=y.size) A1 = FirstDerivative(size=x.size) A1.compute_lipschitz_cst() A2 = FirstDerivative(size=y.size, kind='centered') A2.compute_lipschitz_cst() A3 = A1 / 2 K1 = f * A1 K2 = g * A2 K3 = A3 .. doctest:: >>> V = DiffMapStack(K1, K3, axis=0) >>> np.allclose(V.lipschitz_cst, np.sqrt(K1.lipschitz_cst ** 2 + K3.lipschitz_cst ** 2)) True >>> parV = DiffMapStack(K1, K3, axis=0, n_jobs=-1) >>> np.allclose(V.jacobianT(x) * np.ones(shape=(V.jacobianT(x).shape[1])), parV.jacobianT(x) * np.ones(shape=(V.jacobianT(x).shape[1]))) True See Also -------- :py:class:`~pycsou.core.map.DiffMapVStack`, :py:class:`~pycsou.core.map.DiffMapHStack`, :py:class:`~pycsou.core.map.MapStack` """
[docs] def __init__(self, *diffmaps: DifferentiableMap, axis: int, n_jobs: int = 1, joblib_backend: str = 'loky'): r""" Parameters ---------- diffmaps: DifferentiableMap List of differentiable maps to be stacked axis: int Stacking direction: 0 for vertical and 1 for horizontal stacking. n_jobs: int Number of cores to be used for parallel evaluation of the map stack and its Jacobian matrix. If ``n_jobs==1``, the map stack and its Jacobian matrix are evaluated sequentially, otherwise they are evaluated in parallel. Setting ``n_jobs=-1`` uses all available cores. joblib_backend: str Joblib backend (`more details here <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html>`_). """ MapStack.__init__(self, *diffmaps, axis=axis, n_jobs=n_jobs, joblib_backend=joblib_backend) if axis == 0: lipschitz_cst = np.sqrt(np.sum([diffmap.lipschitz_cst ** 2 for diffmap in self.maps])) diff_lipschitz_cst = np.sqrt(np.sum([diffmap.diff_lipschitz_cst ** 2 for diffmap in self.maps])) else: lipschitz_cst = np.max([diffmap.lipschitz_cst for diffmap in self.maps]) diff_lipschitz_cst = np.max([diffmap.diff_lipschitz_cst for diffmap in self.maps]) DifferentiableMap.__init__(self, shape=self.shape, is_linear=self.is_linear, lipschitz_cst=lipschitz_cst, diff_lipschitz_cst=diff_lipschitz_cst)
[docs] def jacobianT(self, arg: Union[Number, np.ndarray]) -> Union['LinOpHStack', 'LinOpVStack']: from pycsou.func.base import ExplicitLinearFunctional jacobianT_list = [] if self.axis == 0: from pycsou.linop.base import LinOpVStack for diffmap in self.maps: jacobian = diffmap.jacobianT(arg) if isinstance(jacobian, np.ndarray): jacobian = ExplicitLinearFunctional(jacobian, dtype=jacobian.dtype) jacobianT_list.append(jacobian) return LinOpVStack(*jacobianT_list, n_jobs=self.n_jobs, joblib_backend=self.joblib_backend) else: from pycsou.linop.base import LinOpHStack arg_split = np.split(arg, self.sections) for i, diffmap in enumerate(self.maps): jacobian = diffmap.jacobianT(arg_split[i]) if isinstance(jacobian, np.ndarray): jacobian = ExplicitLinearFunctional(jacobian, dtype=jacobian.dtype) jacobianT_list.append(jacobian) return LinOpHStack(*jacobianT_list, n_jobs=self.n_jobs, joblib_backend=self.joblib_backend)
[docs]class DiffMapVStack(DiffMapStack): r""" Alias for vertical stacking of differentiable maps, equivalent to ``DiffMapStack(*maps, axis=0)``. """
[docs] def __init__(self, *diffmaps: DifferentiableMap, n_jobs: int = 1, joblib_backend: str = 'loky'): r""" Parameters ---------- diffmaps: DifferentiableMap List of differentiable maps to be stacked n_jobs: int Number of cores to be used for parallel evaluation of the map stack and its Jacobian matrix. If ``n_jobs==1``, the map stack and its Jacobian matrix are evaluated sequentially, otherwise they are evaluated in parallel. Setting ``n_jobs=-1`` uses all available cores. joblib_backend: str Joblib backend (`more details here <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html>`_). """ super(DiffMapVStack, self).__init__(*diffmaps, axis=0, n_jobs=n_jobs, joblib_backend=joblib_backend)
[docs]class DiffMapHStack(DiffMapStack): r""" Alias for horizontal stacking of differentiable maps, equivalent to ``DiffMapStack(*maps, axis=1)``. """
[docs] def __init__(self, *diffmaps: DifferentiableMap, n_jobs: int = 1, joblib_backend: str = 'loky'): r""" Parameters ---------- diffmaps: DifferentiableMap List of differentiable maps to be stacked. n_jobs: int Number of cores to be used for parallel evaluation of the map stack and its Jacobian matrix. If ``n_jobs==1``, the map stack and its Jacobian matrix are evaluated sequentially, otherwise they are evaluated in parallel. Setting ``n_jobs=-1`` uses all available cores. joblib_backend: str Joblib backend (`more details here <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html>`_). """ super(DiffMapHStack, self).__init__(*diffmaps, axis=1, n_jobs=n_jobs, joblib_backend=joblib_backend)
if __name__ == '__main__': pass # import numpy as np # from pycsou.linop.base import DenseLinearOperator, LinOpStack, LinOpVStack, LinOpHStack, HomothetyMap # from pycsou.linop.conv import Convolve1D # from scipy import signal # # x1 = np.arange(10) # x2 = np.arange(20) # filter = signal.hann(5) # filter[filter.size // 2:] = 0 # L1 = Convolve1D(size=x1.size, filter=filter) # L2 = DenseLinearOperator(np.arange(x2.size * L1.shape[0]).reshape(L1.shape[0], x2.size)) # L3 = LinOpStack(L1, L2, axis=1) # L3(np.concatenate((x1, x2)))