Skip to content

brahmap.lbsim.LBSimProcessTimeSamples

Bases: ProcessTimeSamples

Source code in brahmap/lbsim/lbsim_process_time_samples.py
class LBSimProcessTimeSamples(ProcessTimeSamples):
    def __init__(
        self,
        nside: int,
        observations: Union[lbs.Observation, List[lbs.Observation]],
        pointings: Union[np.ndarray, List[np.ndarray], None] = None,
        hwp: Optional[lbs.HWP] = None,
        pointings_flag: Union[np.ndarray, None] = None,
        solver_type: SolverType = SolverType.IQU,
        noise_weights: Union[np.ndarray, None] = None,
        output_coordinate_system: lbs.CoordinateSystem = lbs.CoordinateSystem.Galactic,
        threshold: float = 1.0e-5,
        dtype_float=np.float64,
    ):
        self.__nside = nside
        self.__coordinate_system = output_coordinate_system
        npix = hp.nside2npix(self.nside)

        (
            self.__obs_list,
            ptg_list,
        ) = lbs.pointings_in_obs._normalize_observations_and_pointings(
            observations=observations, pointings=pointings
        )

        num_total_samples = 0
        for obs in self.obs_list:
            num_total_samples += obs.n_detectors * obs.n_samples

        pix_indices = np.empty(num_total_samples, dtype=int)
        pol_angles = np.empty(num_total_samples, dtype=dtype_float)

        start_idx = 0
        end_idx = 0
        for obs_idx, (obs, curr_pointings) in enumerate(zip(self.obs_list, ptg_list)):
            if hwp is None:
                hwp_angle = None
            else:
                hwp_angle = lbs.pointings_in_obs._get_hwp_angle(
                    obs=obs, hwp=hwp, pointing_dtype=dtype_float
                )

            for det_idx in range(obs.n_detectors):
                (
                    curr_pointings_det,
                    hwp_angle,
                ) = lbs.pointings_in_obs._get_pointings_array(
                    detector_idx=det_idx,
                    pointings=curr_pointings,
                    hwp_angle=hwp_angle,
                    output_coordinate_system=output_coordinate_system,
                    pointings_dtype=dtype_float,
                )

                end_idx += obs.n_samples

                pol_angles[start_idx:end_idx] = lbs.pointings_in_obs._get_pol_angle(
                    curr_pointings_det=curr_pointings_det,
                    hwp_angle=hwp_angle,
                    pol_angle_detectors=obs.pol_angle_rad[det_idx],
                )

                pix_indices[start_idx:end_idx] = hp.ang2pix(
                    nside, curr_pointings_det[:, 0], curr_pointings_det[:, 1]
                )

                start_idx = end_idx

            del hwp_angle, curr_pointings_det

        del curr_pointings

        super().__init__(
            npix=npix,
            pointings=pix_indices,
            pointings_flag=pointings_flag,
            solver_type=solver_type,
            pol_angles=pol_angles,
            noise_weights=noise_weights,
            threshold=threshold,
            dtype_float=dtype_float,
            update_pointings_inplace=True,
        )

    @property
    def obs_list(self):
        return self.__obs_list

    @property
    def nside(self):
        return self.__nside

    @property
    def coordinate_system(self):
        return self.__coordinate_system