Skip to content

Utilities functions

This module contains the utilities functions strictly related to the computation, and input/output.

ProcessTimeSamples

Bases: object

This class precomputes quantities needed during the analysis once the input file have been read. During its initialization, a private member function :func:initializeweights is called to precompute arrays needed for the explicit implementation of :class:BlockDiagonalPreconditionerLO and :class:BlockDiagonalLO. Moreover it masks all the unobserved or pathological pixels which won't be taken into account, via the functions :func:repixelization and :func:flagging_samples. .. note::

This the reason why the value ``npix`` has to be updated after the removal
 of the pathological pixels.

Parameters

  • npix : {int} total number of pixels that could be observed;
  • pixs : {array} list of pixels observed in the time domain;
  • pol : {int,[default pol=1]} process an intensity only (pol=1), polarization only pol=2 and intensity+polarization map (pol=3);
  • phi: {array, [default None]} array with polarization angles (needed if pol=3,2);
  • w: {array, [default None]} array with noise weights , :math:w_t= N^{-1} _{tt}, computed by :func:BlockLO.build_blocks. If it is not set :func:ProcessTimeSamples.initializeweights assumes it to be a :func:numpy.ones array;
  • obspix:{array} Map from the internal pixelization to an external one, i.e. HEALPIX, it has to be modified when pathological pixels are not taken into account; Default is :func:numpy.arange(npix);
  • threshold_cond: {float} set the condition number threshold to mask bad conditioned pixels (it's used in polarization cases). Default is set to 1.e3.
Source code in brahmap/utilities/process_ces.py
class ProcessTimeSamples(object):
    """
    This class  precomputes quantities needed during the analysis once the input file have been read.
    During its initialization,  a private member function :func:`initializeweights`
    is called to precompute arrays needed for the explicit implementation of :class:`BlockDiagonalPreconditionerLO`
     and :class:`BlockDiagonalLO`.
    Moreover it masks all the unobserved or pathological pixels which won't be taken into account,
    via the functions  :func:`repixelization`  and   :func:`flagging_samples`.
    .. note::

        This the reason why the value ``npix`` has to be updated after the removal
         of the pathological pixels.

    **Parameters**

    - ``npix`` : {int}
        total number of pixels that could be observed;
    - ``pixs`` : {array}
        list of pixels observed in the time domain;
    - ``pol`` : {int,[*default* `pol=1`]}
        process an intensity only (``pol=1``), polarization only ``pol=2``
        and intensity+polarization map (``pol=3``);
    - ``phi``: {array, [*default* `None`]}
        array with polarization angles (needed if ``pol=3,2``);
    - ``w``: {array, [*default* `None`]}
        array with noise weights , :math:`w_t= N^{-1} _{tt}`, computed by
        :func:`BlockLO.build_blocks`.   If it is  not set :func:`ProcessTimeSamples.initializeweights`
        assumes it to be a :func:`numpy.ones` array;
    - ``obspix``:{array}
        Map from the internal pixelization to an external one, i.e. HEALPIX, it has to be modified when
        pathological pixels are not taken into account;
        Default is :func:`numpy.arange(npix)`;
    - ``threshold_cond``: {float}
        set the condition number threshold to mask bad conditioned pixels (it's used in polarization cases).
        Default is set to 1.e3.


    """

    def __init__(
        self,
        pixs,
        npix,
        obspix=None,
        pol=1,
        phi=None,
        w=None,
        ground=None,
        threshold_cond=1.0e3,
        obspix2=None,
    ):
        self.pixs = pixs

        self.oldnpix = npix
        self.nsamples = len(pixs)
        self.pol = pol
        self.bashc = bash_colors()
        if w is None:
            w = np.ones(self.nsamples)
        if obspix is None:
            obspix = np.arange(self.nsamples)
        self.obspix = obspix
        if ground is not None:
            neg_groundbins = np.ma.masked_less(ground, 0)
            ground[neg_groundbins.mask] = -1
            pixs[neg_groundbins.mask] = -1
        if obspix2 is None:
            self.threshold = threshold_cond
            self.initializeweights(phi, w)
            self.new_repixelization()
            # self.repixelization()
            self.flagging_samples()
        else:
            self.SetObspix(obspix2)
            self.flagging_samples()
            self.compute_arrays(phi, w)

        if ground is not None:
            # flag the ground array as the pixs one is
            flags = np.ma.masked_equal(pixs, -1)
            ground[flags.mask] = -1
            self.ground = ground

    @property
    def get_new_pixel(self):
        return self.__new_npix, self.obspix

    def SetObspix(self, new_obspix):
        self.old2new = np.full(self.oldnpix, -1, dtype=np.int32)
        if not (is_sorted(self.obspix) and is_sorted(new_obspix)):
            print(self.bashc.warning("Obspix isn't sorted, sorting it.."))
            indexsorted = np.argsort(self.obspix, kind="quicksort")
            self.obspix = self.obspix[indexsorted]

        index_of_new_in_old_obspix = np.searchsorted(self.obspix, new_obspix)
        self.old2new[index_of_new_in_old_obspix] = np.arange(
            len(index_of_new_in_old_obspix)
        )
        try:
            assert np.allclose(new_obspix, self.obspix[index_of_new_in_old_obspix])
        except Exception:
            print(
                self.bashc.fail(
                    "new_obspix contains pixels which aren't in the old obspix"
                )
            )

        self.obspix = new_obspix
        self.__new_npix = len(new_obspix)
        print(self.bashc.bold("NT=%d\tNPIX=%d" % (self.nsamples, self.__new_npix)))

    def compute_arrays(self, phi, w):
        npix = self.__new_npix
        N = self.nsamples
        pixs = self.pixs
        if self.pol == 1:
            self.counts = np.zeros(npix)
            counts = self.counts

            for i in range(N):
                pixel = pixs[i]
                if pixel == -1:
                    continue
                counts[pixel] += w[i]
        else:
            self.cos = np.cos(2.0 * phi)
            self.sin = np.sin(2.0 * phi)
            self.cos2 = np.zeros(npix)
            self.sin2 = np.zeros(npix)
            self.sincos = np.zeros(npix)
            cos, sin = self.cos, self.sin
            cos2, sin2, sincos = self.cos2, self.sin2, self.sincos
            if self.pol == 2:
                for i in range(N):
                    pixel = pixs[i]
                    if pixel == -1:
                        continue
                    cos2[pixel] += w[i] * cos[i] * cos[i]
                    sin2[pixel] += w[i] * sin[i] * sin[i]
                    sincos[pixel] += w[i] * sin[i] * cos[i]

            elif self.pol == 3:
                self.counts = np.zeros(npix)
                self.cosine = np.zeros(npix)
                self.sine = np.zeros(npix)
                counts, cosine, sine = self.counts, self.cosine, self.sine

                for i in range(N):
                    pixel = pixs[i]
                    if pixel == -1:
                        continue
                    counts[pixel] += w[i]
                    cosine[pixel] += w[i] * cos[i]
                    sine[pixel] += w[i] * sin[i]
                    cos2[pixel] += w[i] * cos[i] * cos[i]
                    sin2[pixel] += w[i] * sin[i] * sin[i]
                    sincos[pixel] += w[i] * sin[i] * cos[i]

    def new_repixelization(self):
        if self.pol == 1:
            (
                n_new_pix,
                n_removed_pix,
                old2new,
                self.counts,
                self.obspix,
            ) = repixelize.py_repixelization_pol1(
                self.oldnpix, self.mask, self.counts, self.obspix
            )

            self.counts = np.delete(self.counts, range(n_new_pix, self.oldnpix))

        elif self.pol == 2:
            (
                n_new_pix,
                n_removed_pix,
                old2new,
                self.counts,
                self.obspix,
                self.sin2,
                self.cos2,
                self.sincos,
            ) = repixelize.py_repixelization_pol2(
                self.oldnpix,
                self.mask,
                self.counts,
                self.obspix,
                self.sin2,
                self.cos2,
                self.sincos,
            )

            self.cos2 = np.delete(self.cos2, range(n_new_pix, self.oldnpix))
            self.sin2 = np.delete(self.sin2, range(n_new_pix, self.oldnpix))
            self.sincos = np.delete(self.sincos, range(n_new_pix, self.oldnpix))
        elif self.pol == 3:
            (
                n_new_pix,
                n_removed_pix,
                old2new,
                self.counts,
                self.obspix,
                self.sin2,
                self.cos2,
                self.sincos,
                self.sine,
                self.cosine,
            ) = repixelize.py_repixelization_pol3(
                self.oldnpix,
                self.mask,
                self.counts,
                self.obspix,
                self.sin2,
                self.cos2,
                self.sincos,
                self.sine,
                self.cosine,
            )

            self.cos2 = np.delete(self.cos2, range(n_new_pix, self.oldnpix))
            self.sin2 = np.delete(self.sin2, range(n_new_pix, self.oldnpix))
            self.sincos = np.delete(self.sincos, range(n_new_pix, self.oldnpix))
            self.sine = np.delete(self.sine, range(n_new_pix, self.oldnpix))
            self.cosine = np.delete(self.cosine, range(n_new_pix, self.oldnpix))
            self.counts = np.delete(self.counts, range(n_new_pix, self.oldnpix))

        print(self.bashc.header("___" * 30))
        print(
            self.bashc.blue(
                "Found %d pathological pixels\nRepixelizing  w/ %d pixels."
                % (n_removed_pix, n_new_pix)
            )
        )
        print(self.bashc.header("___" * 30))
        # resizing all the arrays
        self.obspix = np.delete(self.obspix, range(n_new_pix, self.oldnpix))
        self.old2new = old2new
        self.__new_npix = n_new_pix
        print(self.bashc.bold("NT=%d\tNPIX=%d" % (self.nsamples, self.__new_npix)))

    def repixelization(self):
        """
        Performs pixel reordering by excluding all the unbserved or
        pathological pixels.
        """
        n_new_pix = 0
        n_removed_pix = 0
        self.old2new = np.zeros(self.oldnpix, dtype=int)
        if self.pol == 1:
            for jpix in range(self.oldnpix):
                if jpix in self.mask:
                    self.old2new[jpix] = n_new_pix
                    self.counts[n_new_pix] = self.counts[jpix]
                    self.obspix[n_new_pix] = self.obspix[jpix]
                    n_new_pix += 1
                else:
                    self.old2new[jpix] = -1
                    n_removed_pix += 1
            # resize array
            self.counts = np.delete(self.counts, range(n_new_pix, self.oldnpix))
        else:
            for jpix in range(self.oldnpix):
                if jpix in self.mask:
                    self.old2new[jpix] = n_new_pix
                    self.obspix[n_new_pix] = self.obspix[jpix]
                    self.cos2[n_new_pix] = self.cos2[jpix]
                    self.sin2[n_new_pix] = self.sin2[jpix]
                    self.sincos[n_new_pix] = self.sincos[jpix]
                    if self.pol == 3:
                        self.counts[n_new_pix] = self.counts[jpix]
                        self.sine[n_new_pix] = self.sine[jpix]
                        self.cosine[n_new_pix] = self.cosine[jpix]
                    n_new_pix += 1
                else:
                    self.old2new[jpix] = -1
                    n_removed_pix += 1
            # resize
            self.cos2 = np.delete(self.cos2, range(n_new_pix, self.oldnpix))
            self.sin2 = np.delete(self.sin2, range(n_new_pix, self.oldnpix))
            self.sincos = np.delete(self.sincos, range(n_new_pix, self.oldnpix))
            if self.pol == 3:
                self.counts = np.delete(self.counts, range(n_new_pix, self.oldnpix))
                self.sine = np.delete(self.sine, range(n_new_pix, self.oldnpix))
                self.cosine = np.delete(self.cosine, range(n_new_pix, self.oldnpix))
        print(self.bashc.header("___" * 30))
        print(
            self.bashc.blue(
                "Found %d pathological pixels\nRepixelizing  w/ %d pixels."
                % (n_removed_pix, n_new_pix)
            )
        )
        print(self.bashc.header("___" * 30))
        # resizing all the arrays
        self.obspix = np.delete(self.obspix, range(n_new_pix, self.oldnpix))
        self.__new_npix = n_new_pix
        print(self.bashc.bold("NT=%d\tNPIX=%d" % (self.nsamples, self.__new_npix)))

    def flagging_samples(self):
        """
        Flags the time samples related to bad pixels to -1.
        """
        N = self.nsamples
        o2n = self.old2new

        pixs = self.pixs

        for i in range(N):
            pixel = pixs[i]
            if pixel == -1:
                continue
            pixs[i] = o2n[pixel]

    def initializeweights(self, phi, w):
        r"""
        Pre-compute the quantitities needed for the implementation of :math:`(A^T A)`
        and to masks bad pixels.

        **Parameters**

        - ``counts`` :
            how many times a given pixel is observed in the timestream;
        - ``mask``:
            mask  either unobserved  (``counts=0``)  or   bad constrained pixels
            (see the ``pol=3,2`` following cases) ;
        - *If* ``pol=2``:
            the matrix :math:`(A^T A)`  is  symmetric and block-diagonal, each block
            can be written as :

            .. csv-table::

                ":math:`\sum_t cos^2 2 \phi_t`", ":math:`\sum_t sin 2\phi_t cos 2 \phi_t`"
                ":math:`\sum_t sin2 \phi_t cos 2 \phi_t`",   ":math:`\sum_t sin^2 2 \phi_t`"

            the determinant, the trace are therefore needed to compute the  eigenvalues
            of each block via the formula:

            .. math::
                \lambda_{min,max}= Tr(M)/2 \pm \sqrt{Tr^2(M)/4 - det(M)}

            being :math:`M` a ``2x2`` matrix.
            The eigenvalues are needed to define the mask of bad constrained pixels whose
            condition number is :math:`\gg 1`.

        - *If*  ``pol=3``*:
            each block of the matrix :math:`(A^T A)`  is a ``3 x 3`` matrix:

            .. csv-table::

                ":math:`n_{hits}`", ":math:`\sum_t cos 2 \phi_t`", ":math:`\sum_t sin 2 \phi_t`"
                ":math:`\sum_t cos 2 \phi_t`", ":math:`\sum_t cos^2 2 \phi_t`", ":math:`\sum_t sin 2\phi_t cos 2 \phi_t`"
                ":math:`\sum_t sin 2 \phi_t`",  ":math:`\sum_t sin2 \phi_t cos 2 \phi_t`",   ":math:`\sum_t sin^2 2 \phi_t`"

            We then define the mask of bad constrained pixels by both  considering
            the condition number similarly as in the ``pol=2`` case and the pixels
            whose count is :math:`\geq 3`.

        """

        if self.pol == 1:
            self.counts, self.mask = process_samples.py_process_pol1(
                self.nsamples, self.oldnpix, w, self.pixs
            )
        else:
            if self.pol == 2:
                (
                    self.counts,
                    self.sin,
                    self.cos,
                    self.sin2,
                    self.cos2,
                    self.sincos,
                ) = process_samples.py_process_pol2(
                    self.nsamples, self.oldnpix, w, self.pixs, phi
                )

            elif self.pol == 3:
                (
                    self.counts,
                    self.sine,
                    self.cosine,
                    self.sin,
                    self.cos,
                    self.sin2,
                    self.cos2,
                    self.sincos,
                ) = process_samples.py_process_pol3(
                    self.nsamples, self.oldnpix, w, self.pixs, phi
                )

            self.mask = process_samples.py_get_mask_pol(
                self.pol, self.counts, self.sin2, self.cos2, self.sincos, self.threshold
            )

flagging_samples()

Flags the time samples related to bad pixels to -1.

Source code in brahmap/utilities/process_ces.py
def flagging_samples(self):
    """
    Flags the time samples related to bad pixels to -1.
    """
    N = self.nsamples
    o2n = self.old2new

    pixs = self.pixs

    for i in range(N):
        pixel = pixs[i]
        if pixel == -1:
            continue
        pixs[i] = o2n[pixel]

initializeweights(phi, w)

Pre-compute the quantitities needed for the implementation of :math:(A^T A) and to masks bad pixels.

Parameters

  • counts : how many times a given pixel is observed in the timestream;
  • mask: mask either unobserved (counts=0) or bad constrained pixels (see the pol=3,2 following cases) ;
  • If pol=2: the matrix :math:(A^T A) is symmetric and block-diagonal, each block can be written as :

    .. csv-table::

    ":math:`\sum_t cos^2 2 \phi_t`", ":math:`\sum_t sin 2\phi_t cos 2 \phi_t`"
    ":math:`\sum_t sin2 \phi_t cos 2 \phi_t`",   ":math:`\sum_t sin^2 2 \phi_t`"
    

    the determinant, the trace are therefore needed to compute the eigenvalues of each block via the formula:

    .. math:: \lambda_{min,max}= Tr(M)/2 \pm \sqrt{Tr^2(M)/4 - det(M)}

    being :math:M a 2x2 matrix. The eigenvalues are needed to define the mask of bad constrained pixels whose condition number is :math:\gg 1.

  • If pol=3*: each block of the matrix :math:(A^T A) is a 3 x 3 matrix:

    .. csv-table::

    ":math:`n_{hits}`", ":math:`\sum_t cos 2 \phi_t`", ":math:`\sum_t sin 2 \phi_t`"
    ":math:`\sum_t cos 2 \phi_t`", ":math:`\sum_t cos^2 2 \phi_t`", ":math:`\sum_t sin 2\phi_t cos 2 \phi_t`"
    ":math:`\sum_t sin 2 \phi_t`",  ":math:`\sum_t sin2 \phi_t cos 2 \phi_t`",   ":math:`\sum_t sin^2 2 \phi_t`"
    

    We then define the mask of bad constrained pixels by both considering the condition number similarly as in the pol=2 case and the pixels whose count is :math:\geq 3.

Source code in brahmap/utilities/process_ces.py
def initializeweights(self, phi, w):
    r"""
    Pre-compute the quantitities needed for the implementation of :math:`(A^T A)`
    and to masks bad pixels.

    **Parameters**

    - ``counts`` :
        how many times a given pixel is observed in the timestream;
    - ``mask``:
        mask  either unobserved  (``counts=0``)  or   bad constrained pixels
        (see the ``pol=3,2`` following cases) ;
    - *If* ``pol=2``:
        the matrix :math:`(A^T A)`  is  symmetric and block-diagonal, each block
        can be written as :

        .. csv-table::

            ":math:`\sum_t cos^2 2 \phi_t`", ":math:`\sum_t sin 2\phi_t cos 2 \phi_t`"
            ":math:`\sum_t sin2 \phi_t cos 2 \phi_t`",   ":math:`\sum_t sin^2 2 \phi_t`"

        the determinant, the trace are therefore needed to compute the  eigenvalues
        of each block via the formula:

        .. math::
            \lambda_{min,max}= Tr(M)/2 \pm \sqrt{Tr^2(M)/4 - det(M)}

        being :math:`M` a ``2x2`` matrix.
        The eigenvalues are needed to define the mask of bad constrained pixels whose
        condition number is :math:`\gg 1`.

    - *If*  ``pol=3``*:
        each block of the matrix :math:`(A^T A)`  is a ``3 x 3`` matrix:

        .. csv-table::

            ":math:`n_{hits}`", ":math:`\sum_t cos 2 \phi_t`", ":math:`\sum_t sin 2 \phi_t`"
            ":math:`\sum_t cos 2 \phi_t`", ":math:`\sum_t cos^2 2 \phi_t`", ":math:`\sum_t sin 2\phi_t cos 2 \phi_t`"
            ":math:`\sum_t sin 2 \phi_t`",  ":math:`\sum_t sin2 \phi_t cos 2 \phi_t`",   ":math:`\sum_t sin^2 2 \phi_t`"

        We then define the mask of bad constrained pixels by both  considering
        the condition number similarly as in the ``pol=2`` case and the pixels
        whose count is :math:`\geq 3`.

    """

    if self.pol == 1:
        self.counts, self.mask = process_samples.py_process_pol1(
            self.nsamples, self.oldnpix, w, self.pixs
        )
    else:
        if self.pol == 2:
            (
                self.counts,
                self.sin,
                self.cos,
                self.sin2,
                self.cos2,
                self.sincos,
            ) = process_samples.py_process_pol2(
                self.nsamples, self.oldnpix, w, self.pixs, phi
            )

        elif self.pol == 3:
            (
                self.counts,
                self.sine,
                self.cosine,
                self.sin,
                self.cos,
                self.sin2,
                self.cos2,
                self.sincos,
            ) = process_samples.py_process_pol3(
                self.nsamples, self.oldnpix, w, self.pixs, phi
            )

        self.mask = process_samples.py_get_mask_pol(
            self.pol, self.counts, self.sin2, self.cos2, self.sincos, self.threshold
        )

repixelization()

Performs pixel reordering by excluding all the unbserved or pathological pixels.

Source code in brahmap/utilities/process_ces.py
def repixelization(self):
    """
    Performs pixel reordering by excluding all the unbserved or
    pathological pixels.
    """
    n_new_pix = 0
    n_removed_pix = 0
    self.old2new = np.zeros(self.oldnpix, dtype=int)
    if self.pol == 1:
        for jpix in range(self.oldnpix):
            if jpix in self.mask:
                self.old2new[jpix] = n_new_pix
                self.counts[n_new_pix] = self.counts[jpix]
                self.obspix[n_new_pix] = self.obspix[jpix]
                n_new_pix += 1
            else:
                self.old2new[jpix] = -1
                n_removed_pix += 1
        # resize array
        self.counts = np.delete(self.counts, range(n_new_pix, self.oldnpix))
    else:
        for jpix in range(self.oldnpix):
            if jpix in self.mask:
                self.old2new[jpix] = n_new_pix
                self.obspix[n_new_pix] = self.obspix[jpix]
                self.cos2[n_new_pix] = self.cos2[jpix]
                self.sin2[n_new_pix] = self.sin2[jpix]
                self.sincos[n_new_pix] = self.sincos[jpix]
                if self.pol == 3:
                    self.counts[n_new_pix] = self.counts[jpix]
                    self.sine[n_new_pix] = self.sine[jpix]
                    self.cosine[n_new_pix] = self.cosine[jpix]
                n_new_pix += 1
            else:
                self.old2new[jpix] = -1
                n_removed_pix += 1
        # resize
        self.cos2 = np.delete(self.cos2, range(n_new_pix, self.oldnpix))
        self.sin2 = np.delete(self.sin2, range(n_new_pix, self.oldnpix))
        self.sincos = np.delete(self.sincos, range(n_new_pix, self.oldnpix))
        if self.pol == 3:
            self.counts = np.delete(self.counts, range(n_new_pix, self.oldnpix))
            self.sine = np.delete(self.sine, range(n_new_pix, self.oldnpix))
            self.cosine = np.delete(self.cosine, range(n_new_pix, self.oldnpix))
    print(self.bashc.header("___" * 30))
    print(
        self.bashc.blue(
            "Found %d pathological pixels\nRepixelizing  w/ %d pixels."
            % (n_removed_pix, n_new_pix)
        )
    )
    print(self.bashc.header("___" * 30))
    # resizing all the arrays
    self.obspix = np.delete(self.obspix, range(n_new_pix, self.oldnpix))
    self.__new_npix = n_new_pix
    print(self.bashc.bold("NT=%d\tNPIX=%d" % (self.nsamples, self.__new_npix)))

bash_colors

This class contains the necessary definitions to print to bash screen with colors. Sometimes it can be useful...

Source code in brahmap/utilities/utilities_functions.py
class bash_colors:
    """
    This class contains the necessary definitions to print to bash
    screen with colors. Sometimes it can be useful...
    """

    HEADER = "\033[95m"
    OKBLUE = "\033[94m"
    OKGREEN = "\033[92m"
    WARNING = "\033[93m"
    FAIL = "\033[91m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"

    def header(self, string):
        return self.HEADER + str(string) + self.ENDC

    def blue(self, string):
        return self.OKBLUE + str(string) + self.ENDC

    def green(self, string):
        return self.OKGREEN + str(string) + self.ENDC

    def warning(self, string):
        return self.WARNING + str(string) + self.ENDC

    def fail(self, string):
        return self.FAIL + str(string) + self.ENDC

    def bold(self, string):
        return self.BOLD + str(string) + self.ENDC

    def underline(self, string):
        return self.UNDERLINE + str(string) + self.ENDC

angles_gen(theta0, n, sample_freq=200.0, whwp_freq=2.5)

Generate polarization angle given the sample frequency of the instrument, the frequency of HWP and the size n of the timestream.

Source code in brahmap/utilities/utilities_functions.py
def angles_gen(theta0, n, sample_freq=200.0, whwp_freq=2.5):
    """
    Generate  polarization angle given the sample frequency of the instrument,
    the frequency of HWP and the size ``n`` of the timestream.


    """
    # print theta0,sample_freq,whwp_freq,n
    return np.array(
        [theta0 + 2 * np.pi * whwp_freq / sample_freq * i for i in range(n)]
    )

filter_warnings(wfilter)

wfilter: {string} - "ignore": never print matching warnings; - "always": always print matching warnings

Source code in brahmap/utilities/utilities_functions.py
def filter_warnings(wfilter):
    """
    wfilter: {string}
    - "ignore": never print matching warnings;
    - "always": always print matching warnings

    """
    warnings.simplefilter(wfilter)

is_sorted(seq)

Check if sequence is sorted bool

Source code in brahmap/utilities/utilities_functions.py
def is_sorted(seq):
    """
    Check if sequence is sorted
    bool
    """
    return all(seq[i] <= seq[i + 1] for i in range(len(seq) - 1))

noise_val(nb, bandwidth=1)

Generate elements to fill the noise covariance matrix with a random ditribution :math:N_{tt}= < n_t n_t >.

Parameters

  • nb : {int} number of noise stationary intervals, i.e. number of blocks in N_tt'.
  • bandwidth : {int} the width of the diagonal band,e.g. :

    • bandwidth=1 define the first up and low diagonal terms
    • bandwidth=2 2 off diagonal terms.

Returns

  • t: {list of arrays } shape=(nb,bandwidth)
  • diag : {list }, size = nb diagonal values of each block .
Source code in brahmap/utilities/utilities_functions.py
def noise_val(nb, bandwidth=1):
    """
    Generate  elements to fill the  noise covariance
    matrix with a  random ditribution :math:`N_{tt}= < n_t n_t >`.

    **Parameters**

    - ``nb`` : {int}
        number of noise stationary intervals,
        i.e. number  of blocks in N_tt'.
    - ``bandwidth`` : {int}
        the width of the diagonal band,e.g. :

        - ``bandwidth=1`` define the first up and low diagonal terms
        - ``bandwidth=2`` 2 off diagonal terms.

    **Returns**

    - ``t``: {list of arrays }
        ``shape=(nb,bandwidth)``
    - ``diag`` : {list }, ``size = nb``
        diagonal values of each block .

    """
    diag = []
    t = []
    for i in range(nb):
        t.append(np.random.random(size=bandwidth))
    diag = [i[0] for i in t]
    return t, diag

output_profile(pr)

Output of the profiling with :func:profile_run.

Parameter

  • pr: instance returned by :func:profile_run
Source code in brahmap/utilities/utilities_functions.py
def output_profile(pr):
    """
    Output of the profiling with :func:`profile_run`.

    **Parameter**

    - ``pr``:
        instance returned by :func:`profile_run`

    """
    import pstats
    import io

    s = io.StringIO()
    sortby = "cumulative"
    ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
    ps.print_stats()
    print(s.getvalue())
    pass

pairs_gen(nrows, ncols)

Generate random int numbers to fill the pointing matrix for observed pixels. Implemented even for polarization runs.

Source code in brahmap/utilities/utilities_functions.py
def pairs_gen(nrows, ncols):
    """
    Generate random ``int`` numbers   to fill the pointing matrix for observed pixels.
    Implemented even for polarization runs.

    """
    if ncols < 3:
        raise RuntimeError(
            "Not enough pixels!\n Please set Npix >=3, you have set Npix=%d" % ncols
        )

    js = np.random.randint(0, high=ncols, size=nrows)

    return js

profile_run()

Profile the execution with :mod:cProfile

Source code in brahmap/utilities/utilities_functions.py
def profile_run():
    """
    Profile the execution with :mod:`cProfile`
    """
    import cProfile

    pr = cProfile.Profile()
    return pr

subscan_resize(data, subscan)

Resize a tod-size array by considering only the subscan intervals.

Source code in brahmap/utilities/utilities_functions.py
def subscan_resize(data, subscan):
    """
    Resize a tod-size array  by considering only the subscan intervals.
    """
    tmp = []
    for i in range(len(subscan[0])):
        start = subscan[1][i]
        end = subscan[1][i] + subscan[0][i]
        tmp.append(data[start:end])
    return np.concatenate(tmp)

system_setup(nt, npix, nb)

Setup the linear system

Returns

  • d :{array} a nt array of random numbers;
  • pairs: {array } the non-null indices of the pointing matrix;
  • phi :{array} angles if pol=3
  • t,diag : {outputs of :func:noise_val} noise values to construct the noise covariance matrix
Source code in brahmap/utilities/utilities_functions.py
def system_setup(nt, npix, nb):
    """
    Setup the linear system

    **Returns**

    - ``d`` :{array}
        a ``nt`` array of random numbers;
    - ``pairs``: {array }
        the non-null indices of the pointing matrix;
    - phi :{array}
        angles if ``pol=3``
    - t,diag :  {outputs of :func:`noise_val`}
        noise values to construct the noise covariance matrix

    """
    d = np.random.random(nt)
    pairs = pairs_gen(nt, npix)
    phi = angles_gen(rd.uniform(0, np.pi), nt)
    bandsize = 2
    t, diag = noise_val(nb, bandsize)

    return d, pairs, phi, t, diag

lbs_process_timesamples(nside, pointings, pol_angles, pol_idx=3, w=None, threshold_cond=1000.0, obspix=None, galactic_coords=True)

This function accepts the pointing and polarization angle arrays from litebird_sim, rotates them from elliptic to galactic coordinate system, generates the pixel indices of the pointings and then passes them to :func:ProcessTimeSamples.

Args:

  • nside (int): nside for the output map
  • pointings (np.ndarray): An array of detector pointings of shape (nsamp, 2)
  • pol_angles (np.ndarray): A 1-d array of polarization angle
  • pol_idx (int): Type of map-making to use. Defaults to 3.
  • w (np.ndarray): array with noise weights , :math:w_t= N^{-1} _{tt}, computed by :func:BlockLO.build_blocks. If it is not set :func:ProcessTimeSamples.initializeweights assumes it to be a :func:numpy.ones array. Defaults to None.
  • threshold_cond (float): Sets the condition number threshold to mask bad conditioned pixels (it's used in polarization cases). Defaults to 1.e3.
  • obspix (np.ndarray): Map from the internal pixelization to an external one, i.e. HEALPIX, it has to be modified when pathological pixels are not taken into account. It not set, it is assumed to be `numpy.arange(npix). Defaults to None.
  • galactic_coords (bool, optional): Say yes if you want your result in galactic coordinates. Defaults to True.

Returns:

  • pointings (np.ndarray): Pointings as pixel index
  • ProcessTimeSamples: ProcessTimeSamples class
Source code in brahmap/utilities/lbsim_interface.py
def lbs_process_timesamples(
    nside: int,
    pointings: np.ndarray,
    pol_angles: np.ndarray,
    pol_idx: int = 3,
    w: np.ndarray = None,
    threshold_cond: float = 1.0e3,
    obspix: np.ndarray = None,
    galactic_coords: bool = True,
):
    """This function accepts the pointing and polarization angle arrays from `litebird_sim`, rotates them from elliptic to galactic coordinate system, generates the pixel indices of the pointings and then passes them to :func:`ProcessTimeSamples`.

    Args:

    - ``nside`` (int): nside for the output map
    - ``pointings`` (np.ndarray): An array of detector pointings of shape (nsamp, 2)
    - ``pol_angles`` (np.ndarray): A 1-d array of polarization angle
    - ``pol_idx`` (int): Type of map-making to use. Defaults to 3.
    - ``w`` (np.ndarray): array with noise weights , :math:`w_t= N^{-1} _{tt}`, computed by :func:`BlockLO.build_blocks`. If it is  not set :func:`ProcessTimeSamples.initializeweights` assumes it to be a :func:`numpy.ones` array. Defaults to None.
    - ``threshold_cond`` (float): Sets the condition number threshold to mask bad conditioned pixels (it's used in polarization cases). Defaults to 1.e3.
    - ``obspix`` (np.ndarray): Map from the internal pixelization to an external one, i.e. HEALPIX, it has to be modified when pathological pixels are not taken into account. It not set, it is assumed to be `numpy.arange(npix). Defaults to None.
    - ``galactic_coords`` (bool, optional): Say yes if you want your result in galactic coordinates. Defaults to True.

    Returns:

    - ``pointings`` (np.ndarray): Pointings as pixel index
    - ``ProcessTimeSamples``: ProcessTimeSamples class
    """
    if galactic_coords:
        pointings, pol_angles = lbs.coordinates.rotate_coordinates_e2g(
            pointings_ecl=pointings, pol_angle_ecl=pol_angles
        )

    pointings = hp.ang2pix(nside, pointings[:, 0], pointings[:, 1])

    return pointings, ProcessTimeSamples(
        pixs=pointings.copy(),
        npix=hp.nside2npix(nside),
        obspix=obspix,
        pol=pol_idx,
        phi=pol_angles,
        w=w,
        threshold_cond=threshold_cond,
    )