"""This module contains classes and methods for use in the SKA MID
Sensitivity Calculator.
"""
import numpy as np
import astropy.constants as ac
from astropy.coordinates import SkyCoord
import astropy.units as u
import sensitivity_calculator.mid_utilities as mid_utils
from sensitivity_calculator.subarray import (
SubarrayStorage,
SubarraySchema,
STORAGE_PATH,
)
from sensitivity_calculator.utilities import (
Atmosphere,
DishType,
TelParams,
SKA_NDISHES_MAX,
MEERKAT_NDISHES_MAX,
)
# access to subarray files
subarraystorage = SubarrayStorage(STORAGE_PATH)
# Weather to PWV in mm
WEATHER_TO_PWV = {"Good": 5, "Average": 10, "Bad": 20}
[docs]def SEFD_antenna(Tsys, effective_dish_area): # pylint: disable=invalid-name
"""Method to calculate the SEFD of an antenna
:param Tsys: the system temperature for the dish
:type Tsys: astropy.units.Quantity
:param effective_dish_area: product of dish area and dish efficiency
:type effective_dish_area: astropy.units.Quantity
:return: the SEFD of the dish
:rtype: astropy.units.Quantity
"""
return 2 * ac.k_B * Tsys / effective_dish_area
[docs]def SEFD_array(n_type1, n_type2, sefd_dish_type1, sefd_dish_type2):
# pylint: disable=invalid-name
"""Function to compute the SEFD of an heterogeneous array composed
of two dish types.
:param n_type1: the number of dishes of type 1
:type n_type1: int
:param n_type2: the number of dishes of type 2
:type n_type2: int
:param sefd_dish_type1: the dish SEFD for type 1
:type sefd_dish_type1: astropy.units.Quantity
:param sefd_dish_type2: the dish SEFD for type 2
:type sefd_dish_type2: astropy.units.Quantity
:return: the SEFD of the array
:rtype: astropy.units.Quantity
"""
return 1 / np.sqrt(
(n_type1 * (n_type1 - 1) / sefd_dish_type1 ** 2)
+ (2 * n_type1 * n_type2 / (sefd_dish_type1 * sefd_dish_type2))
+ (n_type2 * (n_type2 - 1) / sefd_dish_type2 ** 2)
)
[docs]class MidCalculator: # pylint: disable = too-many-instance-attributes
"""This is the Mid calculator class
"""
def __init__(
self,
obs_band,
obs_freq,
bandwidth,
array_config,
target,
weather=None,
elevation=None,
eta_system=None,
eta_point=None,
eta_coherence=None,
eta_digitisation=None,
eta_correlation=None,
eta_bandpass=None,
n_ska=None,
eta_dish_ska=None,
Tsys_ska=None,
Tspl_ska=None,
Trcv_ska=None,
n_meer=None,
eta_dish_meer=None,
Tsys_meer=None,
Tspl_meer=None,
Trcv_meer=None,
Tsky=None,
Tgal=None,
alpha=None,
): # pylint: disable=invalid-name, too-many-arguments, too-many-locals
"""Calculator class constructor.
"""
# pylint: disable=invalid-name
self.obs_freq = (
obs_freq
if not obs_freq.isscalar
else np.array([obs_freq.value]) * obs_freq.unit
)
self.obs_band = obs_band
self.bandwidth = bandwidth
self.weather = weather
self.array_config = subarraystorage.load(array_config)
# check target is visible at some time
self.location = TelParams.mid_core_location()
if target.icrs.dec > 90 * u.deg + self.location.lat:
# target is never above the horizon
raise RuntimeError("target always below horizon")
else:
self.target = target
# get time at which target is next on meridian.
self.target_transit_lst = target.icrs.ra
self.lst = self.target_transit_lst
self.elevation = elevation if elevation is not None else 45.0 * u.deg
if (
90.0 * u.deg - np.abs(self.location.to_geodetic().lat - target.icrs.dec)
< self.elevation
):
self.elevation = 90.0 * u.deg - np.abs(
self.location.to_geodetic().lat - target.icrs.dec
)
self.altitude = self.elevation
# self.altitude = np.sin(dec) * np.sin(lat) + np.cos(dec) * np.cos(lat)
# self.altitude = self.altaz.alt.to(u.deg)
# for input values not specified, calculate default attributes
self.weather = weather if weather else "Average"
# Convert weather to PWV
if weather in WEATHER_TO_PWV.keys():
self.weather = WEATHER_TO_PWV[weather]
# ..the contributors to eta_system
# ..eta_point is forced to be same for SKA1 and MeerKAT as eta_system currently
# ..covers both dish types
self.eta_point = (
eta_point
if eta_point
else mid_utils.eta_point(self.obs_freq, DishType.SKA1)
)
self.eta_coherence = (
eta_coherence if eta_coherence else mid_utils.eta_coherence(self.obs_freq)
)
self.eta_digitisation = (
eta_digitisation
if eta_digitisation
else mid_utils.eta_digitisation(self.obs_band)
)
self.eta_correlation = (
eta_correlation if eta_correlation else mid_utils.eta_correlation()
)
self.eta_bandpass = eta_bandpass if eta_bandpass else mid_utils.eta_bandpass()
# then eta_system itself
self.eta_system = (
eta_system
if eta_system
else mid_utils.eta_system(
self.eta_point,
self.eta_coherence,
self.eta_digitisation,
self.eta_correlation,
self.eta_bandpass,
)
)
# Number of dishes
self.n_ska = n_ska if n_ska is not None else self.array_config.n_SKA
self.n_meer = n_meer if n_meer is not None else self.array_config.n_MeerKAT
# dish related items
self.eta_dish_ska = (
eta_dish_ska
if eta_dish_ska
else mid_utils.eta_dish(self.obs_freq, DishType.SKA1)
)
self.eta_dish_meer = (
eta_dish_meer
if eta_dish_meer
else mid_utils.eta_dish(self.obs_freq, DishType.MeerKAT)
)
self.Trcv_ska = (
Trcv_ska
if Trcv_ska is not None
else mid_utils.Trcv(self.obs_freq, self.obs_band, DishType.SKA1)
)
self.Trcv_meer = (
Trcv_meer
if Trcv_meer is not None
else mid_utils.Trcv(self.obs_freq, self.obs_band, DishType.MeerKAT)
)
self.alpha = alpha if alpha is not None else 2.75
self.Tgal_ska = (
Tgal
if Tgal is not None
else mid_utils.Tgal(self.target, self.obs_freq, DishType.SKA1, self.alpha)
)
self.Tgal_meer = (
Tgal
if Tgal is not None
else mid_utils.Tgal(
self.target, self.obs_freq, DishType.MeerKAT, self.alpha
)
)
self.Tgal = 0.5 * (self.Tgal_ska + self.Tgal_meer)
self.Tspl_ska = (
Tspl_ska if Tspl_ska is not None else mid_utils.Tspl(DishType.SKA1)
)
self.Tspl_meer = (
Tspl_meer if Tspl_meer is not None else mid_utils.Tspl(DishType.MeerKAT)
)
self.Tsky_ska = (
Tsky
if Tsky is not None
else mid_utils.Tsky(
self.Tgal_ska, self.obs_freq, self.elevation, self.weather,
)
)
self.Tsky_meer = (
Tsky
if Tsky is not None
else mid_utils.Tsky(
self.Tgal_meer, self.obs_freq, self.elevation, self.weather,
)
)
self.Tsky = 0.5 * (self.Tsky_ska + self.Tsky_meer)
self.Tsys_ska = (
Tsys_ska
if Tsys_ska is not None
else mid_utils.Tsys_dish(
self.Trcv_ska, self.Tspl_ska, self.Tsky_ska, self.obs_freq,
)
)
self.Tsys_meer = (
Tsys_meer
if Tsys_meer is not None
else mid_utils.Tsys_dish(
self.Trcv_meer, self.Tspl_meer, self.Tsky_meer, self.obs_freq,
)
)
# Compute the derived values of the SEFD
self.sefd_ska = SEFD_antenna(
self.Tsys_ska, self.eta_dish_ska * TelParams.dish_area(DishType.SKA1),
)
self.sefd_meer = SEFD_antenna(
self.Tsys_meer, self.eta_dish_meer * TelParams.dish_area(DishType.MeerKAT),
)
self.sefd_array = SEFD_array(
self.n_ska, self.n_meer, self.sefd_ska, self.sefd_meer
)
[docs] def calculate_sensitivity(self, integration_time):
"""Calculate sensitivity in Janskys for a specified integration time.
:param integration_time: the integration time (in seconds or equivalent)
:type integration_time: astropy.units.Quantity
:return: the sensitivity of the telescope
:rtype: astropy.units.Quantity
"""
# test that integration_time is a time > 0
test = integration_time.to_value(u.s)
if test < 0.0:
raise RuntimeError("negative integration time")
sensitivity = self.sefd_array / (
self.eta_system * np.sqrt(2 * self.bandwidth * integration_time)
)
# Now want to calculate Tsys in space so that we can compare with
# astronomical fluxes. This means correcting Tsys for the attenuation
# due to the atmosphere in the direction of the target.
tau = Atmosphere.tau_atm(self.weather, self.obs_freq, self.elevation)
return sensitivity * np.exp(tau)
[docs] def calculate_integration_time(self, sensitivity):
"""Calculate the integration time (in seconds) required to reach the specified sensitivity.
:param sensitivity: the required sensitivity (in Jy or equivalent)
:type sensitivity: astropy.units.Quantity
:return: the integration time required
:rtype: astropy.units.Quantity
"""
# test that sensitivity converts to Jy and is > 0
test = sensitivity.to_value(u.Jy)
if test < 0.0:
raise RuntimeError("negative sensitivity")
tau = Atmosphere.tau_atm(self.weather, self.obs_freq, self.elevation)
integration_time = (
np.exp(tau) * self.sefd_array / (sensitivity * self.eta_system)
) ** 2 / (2 * self.bandwidth)
integration_time[self.elevation <= 0.0 * u.deg] = -1.0 * u.s
return integration_time.to(u.s)
[docs] def state(self):
"""
extracts values that are either provided explicitly or calculated implicitly and
pass them to the front end to populate the SC form.
"""
var_names_not_astropy_quantities = [
"weather",
"eta_point",
"eta_coherence",
"eta_digitisation",
"eta_correlation",
"eta_bandpass",
"n_ska",
"eta_dish_ska",
"n_meer",
"eta_dish_meer",
"alpha",
]
var_names_astropy_quantities = [
"Tsys_ska",
"Tspl_ska",
"Trcv_ska",
"Tsys_meer",
"Tspl_meer",
"Trcv_meer",
"Tsky",
"Tgal",
"altitude",
]
return_dict = {
var_name: getattr(self, var_name)
for var_name in var_names_not_astropy_quantities
}
return_dict.update(
{
var_name: getattr(self, var_name).value
for var_name in var_names_astropy_quantities
}
)
return return_dict