import json
from abc import ABC, abstractmethod
from threading import Lock
from typing import List, Tuple, Union
import astropy.units as u
import fsspec
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.table import Table, Column
from cachetools import TTLCache, cached
from spherical_geometry.polygon import SphericalPolygon
from .cutout import Cutout
FFI_TTLCACHE = TTLCache(maxsize=10, ttl=900) # Cache for FFI footprint files
class FootprintCutout(Cutout, ABC):
"""
Abstract class that creates cutouts from data files hosted on the S3 cloud.
Parameters
----------
coordinates : str | `~astropy.coordinates.SkyCoord`
Coordinates of the center of the cutout.
cutout_size : int | array | list | tuple | `~astropy.units.Quantity`
Size of the cutout array.
fill_value : int | float
Value to fill the cutout with if the cutout is outside the image.
limit_rounding_method : str
Method to use for rounding the cutout limits. Options are 'round', 'ceil', and 'floor'.
sequence : int | list | None
Default None. Sequence(s) from which to generate cutouts. Can provide a single
sequence number as an int or a list of sequence numbers. If not specified,
cutouts will be generated from all sequences that contain the cutout.
verbose : bool
If True, log messages are printed to the console.
Methods
-------
cutout()
Fetch the cloud files that contain the cutout and generate the cutouts.
"""
def __init__(self, coordinates: Union[SkyCoord, str],
cutout_size: Union[int, np.ndarray, u.Quantity, List[int], Tuple[int]] = 25,
fill_value: Union[int, float] = np.nan, limit_rounding_method: str = 'round',
sequence: Union[int, List[int], None] = None, verbose: bool = False):
super().__init__([], coordinates, cutout_size, fill_value, limit_rounding_method, verbose)
# Assigning other attributes
if isinstance(sequence, int):
sequence = [sequence] # Convert to list
self._sequence = sequence
# Populate these in child classes
self._s3_footprint_cache = None # S3 URI to footprint cache file
self._arcsec_per_px = None # Number of arcseconds per pixel in an image
@abstractmethod
def _get_files_from_cone_results(self, cone_results: Table) -> dict:
"""
Converts a `~astropy.table.Table` of cone search results to a list of dictionaries containing
metadata for each cloud file that intersects with the cutout.
This method is abstract and should be implemented in subclasses.
"""
raise NotImplementedError('Subclasses must implement this method.')
@abstractmethod
def cutout(self):
"""
Generate cutouts from the cloud files that contain the cutout's footprint.
This method is abstract and should be implemented in subclasses.
"""
raise NotImplementedError('Subclasses must implement this method.')
@staticmethod
def _s_region_to_polygon(s_region: Column) -> Column:
"""
Returns a column of `~spherical_geometry.polygon.SphericalPolygon` objects from a column of
s_region strings.
Parameters
----------
s_region : `~astropy.table.Column`
Column containing the s_region string. Example input: 'POLYGON 229.80771900 -75.17048500
241.67788000 -63.95992300 269.94872000 -64.39276400 277.87862300 -75.57754400'
Returns
-------
polygon : `~astropy.table.Column`
Column containing `~spherical_geometry.polygon.SphericalPolygon` objects representing each s_region.
"""
def ind_sregion_to_polygon(s_reg):
"""
Helper function to convert s_region string to a `~spherical_geometry.polygon.SphericalPolygon` object.
Parameters
----------
s_reg : str
A string defining a spatial region, expected to be in the 'POLYGON' format.
Returns
-------
`~spherical_geometry.polygon.SphericalPolygon`
A SphericalPolygon object created from the provided coordinates.
Raises
------
ValueError
If the S_REGION type is not 'POLYGON'.
"""
# Split input string into individual components
sr_list = s_reg.strip().split()
# Extract the region type (first element of list)
reg_type = sr_list[0].upper()
if reg_type == 'POLYGON':
# Extract RA and Dec values
# RAs are at odd indices
ras = np.array(sr_list[1::2], dtype=float)
# Convert negative RAs to the 0-360 range
ras[ras < 0] = ras[ras < 0] + 360
# Decs are at even indices
decs = np.array(sr_list[2::2], dtype=float)
# Create SphericalPolygon object
return SphericalPolygon.from_radec(ras, decs)
else:
raise ValueError(f'Unsupported s_region type: {reg_type}.')
return np.vectorize(ind_sregion_to_polygon)(s_region)
@staticmethod
def _ffi_intersect(ffi_list: Table, polygon: SphericalPolygon) -> np.ndarray:
"""
Vectorizing the spherical_coordinate intersects_polygon function.
Parameters
----------
ffi_list : `~astropy.table.Table`
Table containing information about FFIs and their footprints.
polygon : `~spherical_geometry.polygon.SphericalPolygon`
SphericalPolygon object representing the cutout's footprint.
Returns
-------
intersect : `~numpy.ndarray`
Boolean array indicating whether each FFI intersects with the cutout.
"""
def single_intersect(ffi, polygon):
return ffi.intersects_poly(polygon)
return np.vectorize(single_intersect)(ffi_list['polygon'], polygon)
@cached(cache=FFI_TTLCACHE, lock=Lock())
def get_ffis(s3_footprint_cache: str) -> Table:
"""
Fetches footprints for Full Frame Images (FFIs) from S3. The resulting
table contains each (FFI) and a 'polygon' column that describes the image's footprints as polygon points
and vectors.
This method is outside the class definition to allow for caching.
Parameters
----------
s3_footprint_cache : str
S3 URI to the footprint cache file.
Returns
-------
ffis : `~astropy.table.Table`
Table containing information about FFIs and their footprints.
"""
# Open footprint file with fsspec
with fsspec.open(s3_footprint_cache, s3={'anon': True}) as f:
ffis = json.load(f)
# Compute spherical polygons
ffis['polygon'] = FootprintCutout._s_region_to_polygon(ffis['s_region'])
# Convert to Astropy table
ffis = Table(ffis)
return ffis
[docs]def ra_dec_crossmatch(all_ffis: Table, coordinates: SkyCoord, cutout_size, arcsec_per_px: int = 21) -> Table:
"""
Returns the Full Frame Images (FFIs) whose footprints overlap with a cutout of a given position and size.
Parameters
----------
all_ffis : `~astropy.table.Table`
Table of FFIs to crossmatch with the cutout.
coordinates : str or `astropy.coordinates.SkyCoord` object
The position around which to cutout.
It may be specified as a string ("ra dec" in degrees)
or as the appropriate `~astropy.coordinates.SkyCoord` object.
cutout_size : int, array-like, `~astropy.units.Quantity`
The size of the cutout array. If ``cutout_size``
is a scalar number or a scalar `~astropy.units.Quantity`,
then a square cutout of ``cutout_size`` will be created. If
``cutout_size`` has two elements, they should be in ``(ny, nx)``
order. Scalar numbers in ``cutout_size`` are assumed to be in
units of pixels. `~astropy.units.Quantity` objects must be in pixel or
angular units.
arcsec_per_px : int, optional
Default 21. The number of arcseconds per pixel in an image. Used to determine
the footprint of the cutout. Default is the number of arcseconds per pixel in
a TESS image.
Returns
-------
matching_ffis : `~astropy.table.Table`
Table containing information about FFIs whose footprints overlap those of the cutout.
"""
# Convert coordinates to SkyCoord
if not isinstance(coordinates, SkyCoord):
coordinates = SkyCoord(coordinates, unit='deg')
ra, dec = coordinates.ra, coordinates.dec
# Parse cutout size
cutout_size = Cutout._parse_size_input(cutout_size)
# Create polygon for intersection
# Convert dimensions from pixels to arcseconds and divide by 2 to get offset from center
ra_offset = ((cutout_size[0] * arcsec_per_px) / 2) * u.arcsec
dec_offset = ((cutout_size[1] * arcsec_per_px) / 2) * u.arcsec
# Calculate RA and Dec boundaries
ra_bounds = [ra - ra_offset, ra + ra_offset]
dec_bounds = [dec - dec_offset, dec + dec_offset]
# Get RA and Dec for four corners of rectangle
ras = [ra_bounds[0].value, ra_bounds[1].value, ra_bounds[1].value, ra_bounds[0].value]
decs = [dec_bounds[0].value, dec_bounds[0].value, dec_bounds[1].value, dec_bounds[1].value]
# Create SphericalPolygon for comparison
cutout_fp = SphericalPolygon.from_radec(ras, decs, center=(ra, dec))
# Find indices of FFIs that intersect with the cutout
ffi_inds = np.vectorize(lambda ffi: ffi.intersects_poly(cutout_fp))(all_ffis['polygon'])
ffi_inds = FootprintCutout._ffi_intersect(all_ffis, cutout_fp)
return all_ffis[ffi_inds]