Source code for sensitivity_calculator.mid

"""This module contains classes and methods for use in the SKA MID
Sensitivity Calculator.
"""

import numpy as np
import astropy.constants as ac
from astropy.coordinates import SkyCoord
import astropy.units as u

import sensitivity_calculator.mid_utilities as mid_utils
from sensitivity_calculator.subarray import (
    SubarrayStorage,
    SubarraySchema,
    STORAGE_PATH,
)
from sensitivity_calculator.utilities import (
    Atmosphere,
    DishType,
    TelParams,
    SKA_NDISHES_MAX,
    MEERKAT_NDISHES_MAX,
)


# access to subarray files
subarraystorage = SubarrayStorage(STORAGE_PATH)


# Weather to PWV in mm
WEATHER_TO_PWV = {"Good": 5, "Average": 10, "Bad": 20}


[docs]def SEFD_antenna(Tsys, effective_dish_area): # pylint: disable=invalid-name """Method to calculate the SEFD of an antenna :param Tsys: the system temperature for the dish :type Tsys: astropy.units.Quantity :param effective_dish_area: product of dish area and dish efficiency :type effective_dish_area: astropy.units.Quantity :return: the SEFD of the dish :rtype: astropy.units.Quantity """ return 2 * ac.k_B * Tsys / effective_dish_area
[docs]def SEFD_array(n_type1, n_type2, sefd_dish_type1, sefd_dish_type2): # pylint: disable=invalid-name """Function to compute the SEFD of an heterogeneous array composed of two dish types. :param n_type1: the number of dishes of type 1 :type n_type1: int :param n_type2: the number of dishes of type 2 :type n_type2: int :param sefd_dish_type1: the dish SEFD for type 1 :type sefd_dish_type1: astropy.units.Quantity :param sefd_dish_type2: the dish SEFD for type 2 :type sefd_dish_type2: astropy.units.Quantity :return: the SEFD of the array :rtype: astropy.units.Quantity """ return 1 / np.sqrt( (n_type1 * (n_type1 - 1) / sefd_dish_type1 ** 2) + (2 * n_type1 * n_type2 / (sefd_dish_type1 * sefd_dish_type2)) + (n_type2 * (n_type2 - 1) / sefd_dish_type2 ** 2) )
[docs]class MidCalculator: # pylint: disable = too-many-instance-attributes """This is the Mid calculator class """ def __init__( self, obs_band, obs_freq, bandwidth, array_config, target, weather=None, elevation=None, eta_system=None, eta_point=None, eta_coherence=None, eta_digitisation=None, eta_correlation=None, eta_bandpass=None, n_ska=None, eta_dish_ska=None, Tsys_ska=None, Tspl_ska=None, Trcv_ska=None, n_meer=None, eta_dish_meer=None, Tsys_meer=None, Tspl_meer=None, Trcv_meer=None, Tsky=None, Tgal=None, alpha=None, ): # pylint: disable=invalid-name, too-many-arguments, too-many-locals """Calculator class constructor. """ # pylint: disable=invalid-name self.obs_freq = ( obs_freq if not obs_freq.isscalar else np.array([obs_freq.value]) * obs_freq.unit ) self.obs_band = obs_band self.bandwidth = bandwidth self.weather = weather self.array_config = subarraystorage.load(array_config) # check target is visible at some time self.location = TelParams.mid_core_location() if target.icrs.dec > 90 * u.deg + self.location.lat: # target is never above the horizon raise RuntimeError("target always below horizon") else: self.target = target # get time at which target is next on meridian. self.target_transit_lst = target.icrs.ra self.lst = self.target_transit_lst self.elevation = elevation if elevation is not None else 45.0 * u.deg if ( 90.0 * u.deg - np.abs(self.location.to_geodetic().lat - target.icrs.dec) < self.elevation ): self.elevation = 90.0 * u.deg - np.abs( self.location.to_geodetic().lat - target.icrs.dec ) self.altitude = self.elevation # self.altitude = np.sin(dec) * np.sin(lat) + np.cos(dec) * np.cos(lat) # self.altitude = self.altaz.alt.to(u.deg) # for input values not specified, calculate default attributes self.weather = weather if weather else "Average" # Convert weather to PWV if weather in WEATHER_TO_PWV.keys(): self.weather = WEATHER_TO_PWV[weather] # ..the contributors to eta_system # ..eta_point is forced to be same for SKA1 and MeerKAT as eta_system currently # ..covers both dish types self.eta_point = ( eta_point if eta_point else mid_utils.eta_point(self.obs_freq, DishType.SKA1) ) self.eta_coherence = ( eta_coherence if eta_coherence else mid_utils.eta_coherence(self.obs_freq) ) self.eta_digitisation = ( eta_digitisation if eta_digitisation else mid_utils.eta_digitisation(self.obs_band) ) self.eta_correlation = ( eta_correlation if eta_correlation else mid_utils.eta_correlation() ) self.eta_bandpass = eta_bandpass if eta_bandpass else mid_utils.eta_bandpass() # then eta_system itself self.eta_system = ( eta_system if eta_system else mid_utils.eta_system( self.eta_point, self.eta_coherence, self.eta_digitisation, self.eta_correlation, self.eta_bandpass, ) ) # Number of dishes self.n_ska = n_ska if n_ska is not None else self.array_config.n_SKA self.n_meer = n_meer if n_meer is not None else self.array_config.n_MeerKAT # dish related items self.eta_dish_ska = ( eta_dish_ska if eta_dish_ska else mid_utils.eta_dish(self.obs_freq, DishType.SKA1) ) self.eta_dish_meer = ( eta_dish_meer if eta_dish_meer else mid_utils.eta_dish(self.obs_freq, DishType.MeerKAT) ) self.Trcv_ska = ( Trcv_ska if Trcv_ska is not None else mid_utils.Trcv(self.obs_freq, self.obs_band, DishType.SKA1) ) self.Trcv_meer = ( Trcv_meer if Trcv_meer is not None else mid_utils.Trcv(self.obs_freq, self.obs_band, DishType.MeerKAT) ) self.alpha = alpha if alpha is not None else 2.75 self.Tgal_ska = ( Tgal if Tgal is not None else mid_utils.Tgal(self.target, self.obs_freq, DishType.SKA1, self.alpha) ) self.Tgal_meer = ( Tgal if Tgal is not None else mid_utils.Tgal( self.target, self.obs_freq, DishType.MeerKAT, self.alpha ) ) self.Tgal = 0.5 * (self.Tgal_ska + self.Tgal_meer) self.Tspl_ska = ( Tspl_ska if Tspl_ska is not None else mid_utils.Tspl(DishType.SKA1) ) self.Tspl_meer = ( Tspl_meer if Tspl_meer is not None else mid_utils.Tspl(DishType.MeerKAT) ) self.Tsky_ska = ( Tsky if Tsky is not None else mid_utils.Tsky( self.Tgal_ska, self.obs_freq, self.elevation, self.weather, ) ) self.Tsky_meer = ( Tsky if Tsky is not None else mid_utils.Tsky( self.Tgal_meer, self.obs_freq, self.elevation, self.weather, ) ) self.Tsky = 0.5 * (self.Tsky_ska + self.Tsky_meer) self.Tsys_ska = ( Tsys_ska if Tsys_ska is not None else mid_utils.Tsys_dish( self.Trcv_ska, self.Tspl_ska, self.Tsky_ska, self.obs_freq, ) ) self.Tsys_meer = ( Tsys_meer if Tsys_meer is not None else mid_utils.Tsys_dish( self.Trcv_meer, self.Tspl_meer, self.Tsky_meer, self.obs_freq, ) ) # Compute the derived values of the SEFD self.sefd_ska = SEFD_antenna( self.Tsys_ska, self.eta_dish_ska * TelParams.dish_area(DishType.SKA1), ) self.sefd_meer = SEFD_antenna( self.Tsys_meer, self.eta_dish_meer * TelParams.dish_area(DishType.MeerKAT), ) self.sefd_array = SEFD_array( self.n_ska, self.n_meer, self.sefd_ska, self.sefd_meer )
[docs] def calculate_sensitivity(self, integration_time): """Calculate sensitivity in Janskys for a specified integration time. :param integration_time: the integration time (in seconds or equivalent) :type integration_time: astropy.units.Quantity :return: the sensitivity of the telescope :rtype: astropy.units.Quantity """ # test that integration_time is a time > 0 test = integration_time.to_value(u.s) if test < 0.0: raise RuntimeError("negative integration time") sensitivity = self.sefd_array / ( self.eta_system * np.sqrt(2 * self.bandwidth * integration_time) ) # Now want to calculate Tsys in space so that we can compare with # astronomical fluxes. This means correcting Tsys for the attenuation # due to the atmosphere in the direction of the target. tau = Atmosphere.tau_atm(self.weather, self.obs_freq, self.elevation) return sensitivity * np.exp(tau)
[docs] def calculate_integration_time(self, sensitivity): """Calculate the integration time (in seconds) required to reach the specified sensitivity. :param sensitivity: the required sensitivity (in Jy or equivalent) :type sensitivity: astropy.units.Quantity :return: the integration time required :rtype: astropy.units.Quantity """ # test that sensitivity converts to Jy and is > 0 test = sensitivity.to_value(u.Jy) if test < 0.0: raise RuntimeError("negative sensitivity") tau = Atmosphere.tau_atm(self.weather, self.obs_freq, self.elevation) integration_time = ( np.exp(tau) * self.sefd_array / (sensitivity * self.eta_system) ) ** 2 / (2 * self.bandwidth) integration_time[self.elevation <= 0.0 * u.deg] = -1.0 * u.s return integration_time.to(u.s)
[docs] def state(self): """ extracts values that are either provided explicitly or calculated implicitly and pass them to the front end to populate the SC form. """ var_names_not_astropy_quantities = [ "weather", "eta_point", "eta_coherence", "eta_digitisation", "eta_correlation", "eta_bandpass", "n_ska", "eta_dish_ska", "n_meer", "eta_dish_meer", "alpha", ] var_names_astropy_quantities = [ "Tsys_ska", "Tspl_ska", "Trcv_ska", "Tsys_meer", "Tspl_meer", "Trcv_meer", "Tsky", "Tgal", "altitude", ] return_dict = { var_name: getattr(self, var_name) for var_name in var_names_not_astropy_quantities } return_dict.update( { var_name: getattr(self, var_name).value for var_name in var_names_astropy_quantities } ) return return_dict