Skip to content

brahmap.core.PointingLO

Bases: LinearOperator

Derived class from the one from the :class:LinearOperator in :mod:linop. It constitutes an interface for dealing with the projection operator (pointing matrix).

Since this can be represented as a sparse matrix, it is initialized \ by an array of observed pixels which resembles the (i,j) positions \ of the non-null elements of the matrix,obs_pixs.

Parameters

  • processed_samples: {:class:ProcessTimeSamples} the class (instantiated before :class:PointingLO)has already computed the :math:\cos 2\phi and :math:\sin 2\phi, we link the cos2phi and sin2phi attributes of this class to the :class:ProcessTimeSamples ones ;
Source code in brahmap/core/linearoperators.py
class PointingLO(LinearOperator):
    r"""Derived class from the one from the  :class:`LinearOperator` in :mod:`linop`.
    It constitutes an interface for dealing with the projection operator
    (pointing matrix).

    Since this can be represented as a sparse matrix, it is initialized \
    by an array of observed pixels which resembles the  ``(i,j)`` positions \
    of the non-null elements of  the matrix,``obs_pixs``.

    **Parameters**

    - ``processed_samples``: {:class:`ProcessTimeSamples`}
        the class (instantiated before :class:`PointingLO`)has already computed
        the :math:`\cos 2\phi` and :math:`\sin 2\phi`, we link the ``cos2phi`` and ``sin2phi``
        attributes of this class to the  :class:`ProcessTimeSamples` ones ;

    """

    def __init__(
        self,
        processed_samples: ProcessTimeSamples,
        solver_type: Union[None, SolverType] = None,
    ):
        if solver_type is None:
            self.__solver_type = processed_samples.solver_type
        else:
            MPI_RAISE_EXCEPTION(
                condition=(int(processed_samples.solver_type) < int(solver_type)),
                exception=ValueError,
                message="`solver_type` must be lower than or equal to the"
                "`solver_type` of `processed_samples` object",
            )
            self.__solver_type = solver_type

        self.new_npix = processed_samples.new_npix
        self.ncols = processed_samples.new_npix * self.solver_type
        self.nrows = processed_samples.nsamples

        self.pointings = processed_samples.pointings
        self.pointings_flag = processed_samples.pointings_flag

        if self.solver_type > 1:
            self.sin2phi = processed_samples.sin2phi
            self.cos2phi = processed_samples.cos2phi

        if self.solver_type == 1:
            super(PointingLO, self).__init__(
                nargin=self.ncols,
                nargout=self.nrows,
                symmetric=False,
                matvec=self._mult_I,
                rmatvec=self._rmult_I,
                dtype=processed_samples.dtype_float,
            )
        elif self.solver_type == 2:
            super(PointingLO, self).__init__(
                nargin=self.ncols,
                nargout=self.nrows,
                symmetric=False,
                matvec=self._mult_QU,
                rmatvec=self._rmult_QU,
                dtype=processed_samples.dtype_float,
            )
        else:
            super(PointingLO, self).__init__(
                nargin=self.ncols,
                nargout=self.nrows,
                matvec=self._mult_IQU,
                symmetric=False,
                rmatvec=self._rmult_IQU,
                dtype=processed_samples.dtype_float,
            )

    def _mult_I(self, vec: np.ndarray):
        r"""
        Performs the product of a sparse matrix :math:`Av`,\
         with :math:`v` a  :mod:`numpy`  array (:math:`dim(v)=n_{pix}`)  .

        It extracts the components of :math:`v` corresponding  to the non-null \
        elements of the operator.

        """

        MPI_RAISE_EXCEPTION(
            condition=(len(vec) != self.ncols),
            exception=ValueError,
            message=f"Dimensions of `vec` is not compatible with the dimension of this `PointingLO` instance.\nShape of `PointingLO` instance: {self.shape}\nShape of `vec`: {vec.shape}",
        )

        if vec.dtype != self.dtype:
            if MPI_UTILS.rank == 0:
                warnings.warn(
                    f"dtype of `vec` will be changed to {self.dtype}",
                    TypeChangeWarning,
                )
            vec = vec.astype(dtype=self.dtype, copy=False)

        prod = np.zeros(self.nrows, dtype=self.dtype)

        PointingLO_tools.PLO_mult_I(
            nsamples=self.nrows,
            pointings=self.pointings,
            pointings_flag=self.pointings_flag,
            vec=vec,
            prod=prod,
        )

        return prod

    def _rmult_I(self, vec: np.ndarray):
        r"""
        Performs the product for the transpose operator :math:`A^T`.

        """

        MPI_RAISE_EXCEPTION(
            condition=(len(vec) != self.nrows),
            exception=ValueError,
            message=f"Dimensions of `vec` is not compatible with the dimension of this `PointingLO` instance.\nShape of `PointingLO` instance: {self.shape}\nShape of `vec`: {vec.shape}",
        )

        if vec.dtype != self.dtype:
            if MPI_UTILS.rank == 0:
                warnings.warn(
                    f"dtype of `vec` will be changed to {self.dtype}",
                    TypeChangeWarning,
                )
            vec = vec.astype(dtype=self.dtype, copy=False)

        prod = np.zeros(self.ncols, dtype=self.dtype)

        PointingLO_tools.PLO_rmult_I(
            new_npix=self.new_npix,
            nsamples=self.nrows,
            pointings=self.pointings,
            pointings_flag=self.pointings_flag,
            vec=vec,
            prod=prod,
            comm=MPI_UTILS.comm,
        )

        return prod

    def _mult_QU(self, vec: np.ndarray):
        r"""Performs :math:`A * v` with :math:`v` being a *polarization* vector.
        The output array will encode a linear combination of the two Stokes
        parameters,  (whose components are stored contiguously).

        .. math::
            d_t=  Q_p \cos(2\phi_t)+ U_p \sin(2\phi_t).
        """

        MPI_RAISE_EXCEPTION(
            condition=(len(vec) != self.ncols),
            exception=ValueError,
            message=f"Dimensions of `vec` is not compatible with the dimension of this `PointingLO` instance.\nShape of `PointingLO` instance: {self.shape}\nShape of `vec`: {vec.shape}",
        )

        if vec.dtype != self.dtype:
            if MPI_UTILS.rank == 0:
                warnings.warn(
                    f"dtype of `vec` will be changed to {self.dtype}",
                    TypeChangeWarning,
                )
            vec = vec.astype(dtype=self.dtype, copy=False)

        prod = np.zeros(self.nrows, dtype=self.dtype)

        PointingLO_tools.PLO_mult_QU(
            nsamples=self.nrows,
            pointings=self.pointings,
            pointings_flag=self.pointings_flag,
            sin2phi=self.sin2phi,
            cos2phi=self.cos2phi,
            vec=vec,
            prod=prod,
        )

        return prod

    def _rmult_QU(self, vec: np.ndarray):
        r"""
        Performs :math:`A^T * v`. The output vector will be a QU-map-like array.
        """

        MPI_RAISE_EXCEPTION(
            condition=(len(vec) != self.nrows),
            exception=ValueError,
            message=f"Dimensions of `vec` is not compatible with the dimension of this `PointingLO` instance.\nShape of `PointingLO` instance: {self.shape}\nShape of `vec`: {vec.shape}",
        )

        if vec.dtype != self.dtype:
            if MPI_UTILS.rank == 0:
                warnings.warn(
                    f"dtype of `vec` will be changed to {self.dtype}",
                    TypeChangeWarning,
                )
            vec = vec.astype(dtype=self.dtype, copy=False)

        prod = np.zeros(self.ncols, dtype=self.dtype)

        PointingLO_tools.PLO_rmult_QU(
            new_npix=self.new_npix,
            nsamples=self.nrows,
            pointings=self.pointings,
            pointings_flag=self.pointings_flag,
            sin2phi=self.sin2phi,
            cos2phi=self.cos2phi,
            vec=vec,
            prod=prod,
            comm=MPI_UTILS.comm,
        )

        return prod

    def _mult_IQU(self, vec: np.ndarray):
        r"""Performs the product of a sparse matrix :math:`Av`,
        with ``v`` a  :mod:`numpy` array containing the
        three Stokes parameters [IQU] .

        .. note::
            Compared to the operation ``mult`` this routine returns a
            :math:`n_t`-size vector defined as:

            .. math::
                d_t= I_p + Q_p \cos(2\phi_t)+ U_p \sin(2\phi_t).

            with :math:`p` is the pixel observed at time :math:`t` with polarization angle
            :math:`\phi_t`.
        """

        MPI_RAISE_EXCEPTION(
            condition=(len(vec) != self.ncols),
            exception=ValueError,
            message=f"Dimensions of `vec` is not compatible with the dimension of this `PointingLO` instance.\nShape of `PointingLO` instance: {self.shape}\nShape of `vec`: {vec.shape}",
        )

        if vec.dtype != self.dtype:
            if MPI_UTILS.rank == 0:
                warnings.warn(
                    f"dtype of `vec` will be changed to {self.dtype}",
                    TypeChangeWarning,
                )
            vec = vec.astype(dtype=self.dtype, copy=False)

        prod = np.zeros(self.nrows, dtype=self.dtype)

        PointingLO_tools.PLO_mult_IQU(
            nsamples=self.nrows,
            pointings=self.pointings,
            pointings_flag=self.pointings_flag,
            sin2phi=self.sin2phi,
            cos2phi=self.cos2phi,
            vec=vec,
            prod=prod,
        )

        return prod

    def _rmult_IQU(self, vec: np.ndarray):
        r"""
        Performs the product for the transpose operator :math:`A^T` to get a IQU map-like vector.
        Since this vector resembles the pixel of 3 maps it has 3 times the size ``Npix``.
        IQU values referring to the same pixel are  contiguously stored in the memory.

        """

        MPI_RAISE_EXCEPTION(
            condition=(len(vec) != self.nrows),
            exception=ValueError,
            message=f"Dimensions of `vec` is not compatible with the dimension of this `PointingLO` instance.\nShape of `PointingLO` instance: {self.shape}\nShape of `vec`: {vec.shape}",
        )

        if vec.dtype != self.dtype:
            if MPI_UTILS.rank == 0:
                warnings.warn(
                    f"dtype of `vec` will be changed to {self.dtype}",
                    TypeChangeWarning,
                )
            vec = vec.astype(dtype=self.dtype, copy=False)

        prod = np.zeros(self.ncols, dtype=self.dtype)

        PointingLO_tools.PLO_rmult_IQU(
            new_npix=self.new_npix,
            nsamples=self.nrows,
            pointings=self.pointings,
            pointings_flag=self.pointings_flag,
            sin2phi=self.sin2phi,
            cos2phi=self.cos2phi,
            vec=vec,
            prod=prod,
            comm=MPI_UTILS.comm,
        )

        return prod

    @property
    def solver_type(self):
        return self.__solver_type