Source code for astrocut.cube_cut

# Licensed under a 3-clause BSD style license - see LICENSE.rst

"""This module implements the cutout functionality."""

import os
import warnings
from concurrent.futures import ThreadPoolExecutor
from itertools import product
from time import time
from typing import Any, Dict, Literal, Union

import astropy.units as u
import numpy as np
from astropy import wcs
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.time import Time

from . import __version__
from .exceptions import InputWarning, InvalidQueryError
from .utils.wcs_fitting import fit_wcs_from_points

# todo: investigate why for small cutouts the astropy version is not working
# from astropy.wcs.utils import fit_wcs_from_points



[docs]class CutoutFactory(): """ Class for creating image cutouts. This class encompasses all of the cutout functionality. In the current version this means creating cutout target pixel files from both SPOC (Science Processing Operations Center) and TICA (Tess Image CAlibration) full frame image cubes. Future versions will include more generalized cutout functionality. """ def __init__(self): """ Initialization function. """ self.product = None self.cube_wcs = None # WCS information from the image cube self.cutout_wcs = None # WCS information (linear) for the cutout self.cutout_wcs_fit = { "WCS_MSEP": [None, "[deg] Max offset between cutout WCS and FFI WCS"], "WCS_SIG": [None, "[deg] Error measurement of cutout WCS fit"], } self.cutout_lims = np.zeros((2, 2), dtype=int) # Cutout pixel limits, [[ymin,ymax],[xmin,xmax]] self.center_coord = None # Central skycoord # Extra keywords from the FFI image headers in SPOC. # These are applied to both SPOC and TICA cutouts for consistency. self.img_kwds = { "BACKAPP": [None, "background is subtracted"], "CDPP0_5": [None, "RMS CDPP on 0.5-hr time scales"], "CDPP1_0": [None, "RMS CDPP on 1.0-hr time scales"], "CDPP2_0": [None, "RMS CDPP on 2.0-hr time scales"], "CROWDSAP": [None, "Ratio of target flux to total flux in op. ap."], "DEADAPP": [None, "deadtime applied"], "DEADC": [None, "deadtime correction"], "EXPOSURE": [None, "[d] time on source"], "FLFRCSAP": [None, "Frac. of target flux w/in the op. aperture"], "FRAMETIM": [None, "[s] frame time [INT_TIME + READTIME]"], "FXDOFF": [None, "compression fixed offset"], "GAINA": [None, "[electrons/count] CCD output A gain"], "GAINB": [None, "[electrons/count] CCD output B gain"], "GAINC": [None, "[electrons/count] CCD output C gain"], "GAIND": [None, "[electrons/count] CCD output D gain"], "INT_TIME": [None, "[s] photon accumulation time per frame"], "LIVETIME": [None, "[d] TELAPSE multiplied by DEADC"], "MEANBLCA": [None, "[count] FSW mean black level CCD output A"], "MEANBLCB": [None, "[count] FSW mean black level CCD output B"], "MEANBLCC": [None, "[count] FSW mean black level CCD output C"], "MEANBLCD": [None, "[count] FSW mean black level CCD output D"], "NREADOUT": [None, "number of read per cadence"], "NUM_FRM": [None, "number of frames per time stamp"], "READNOIA": [None, "[electrons] read noise CCD output A"], "READNOIB": [None, "[electrons] read noise CCD output B"], "READNOIC": [None, "[electrons] read noise CCD output C"], "READNOID": [None, "[electrons] read noise CCD output D"], "READTIME": [None, "[s] readout time per frame"], "TIERRELA": [None, "[d] relative time error"], "TIMEDEL": [None, "[d] time resolution of data"], "TIMEPIXR": [None, "bin time beginning=0 middle=0.5 end=1"], "TMOFST11": [None, "(s) readout delay for camera 1 and ccd 1"], "VIGNAPP": [None, "vignetting or collimator correction applied"], } def _parse_table_info(self, table_data, verbose=False): """ Takes the header and the middle entry from the cube table (EXT 2) of image header data, builds a WCS object that encapsulates the given WCS information, and collects into a dictionary the other keywords we care about. The WCS is stored in ``self.cube_wcs``, and the extra keywords in ``self.img_kwds`` Parameters ---------- table_data : `~astropy.io.fits.fitsrec.FITS_rec` The cube image header data table. """ # Populating `table_row` with the primary header keywords # of the middle FFI data_ind = len(table_data)//2 table_row = None while table_row is None: table_row = table_data[data_ind] # Making sure we have a row with wcs info. wcsaxes_keyword = 'CTYPE2' if table_row[wcsaxes_keyword] != 'DEC--TAN-SIP': table_row = None data_ind += 1 if data_ind == len(table_data): raise wcs.NoWcsKeywordsFoundError("No FFI rows contain valid WCS keywords.") if verbose: print("Using WCS from row {} out of {}".format(data_ind, len(table_data))) # Turning the table row into a new header object wcs_header = fits.header.Header() for col in table_data.columns: wcs_val = table_row[col.name] if (not isinstance(wcs_val, str)) and (np.isnan(wcs_val)): continue # Just skip nans wcs_header[col.name] = wcs_val # Setting the cube wcs self.cube_wcs = wcs.WCS(wcs_header, relax=True) # Filling the img_kwds dictionary while we are here for kwd in self.img_kwds: self.img_kwds[kwd][0] = wcs_header.get(kwd) # Adding the info about which FFI we got the WCS info from self.img_kwds["WCS_FFI"] = [table_data[data_ind]["FFI_FILE"], "FFI used for cutout WCS"] def _get_cutout_limits(self, cutout_size): """ Takes the center coordinates, cutout size, and the wcs from which the cutout is being taken and returns the x and y pixel limits for the cutout. Parameters ---------- cutout_size : array [ny,nx] in with ints (pixels) or astropy quantities Returns ------- response : `numpy.array` The cutout pixel limits in an array of the form [[ymin,ymax],[xmin,xmax]] """ # Note: This is returning the center pixel in 1-up try: center_pixel = self.center_coord.to_pixel(self.cube_wcs, 1) except wcs.NoConvergence: # If wcs can't converge, center coordinate is far from the footprint raise InvalidQueryError("Cutout location is not in cube footprint!") lims = np.zeros((2, 2), dtype=int) for axis, size in enumerate(cutout_size): if not isinstance(size, u.Quantity): # assume pixels dim = size / 2 elif size.unit == u.pixel: # also pixels dim = size.value / 2 elif size.unit.physical_type == 'angle': pixel_scale = u.Quantity(wcs.utils.proj_plane_pixel_scales(self.cube_wcs)[axis], self.cube_wcs.wcs.cunit[axis]) dim = (size / pixel_scale).decompose() / 2 lims[axis, 0] = int(np.round(center_pixel[axis] - 1 - dim)) lims[axis, 1] = int(np.round(center_pixel[axis] - 1 + dim)) # The case where the requested area is so small it rounds to zero if lims[axis, 0] == lims[axis, 1]: lims[axis, 0] = int(np.floor(center_pixel[axis] - 1)) lims[axis, 1] = int(np.ceil(center_pixel[axis] - 1)) # Checking at least some of the cutout is on the cube if ((lims[0, 0] <= 0) and (lims[0, 1] <= 0)) or ((lims[1, 0] <= 0) and (lims[1, 1] <= 0)): raise InvalidQueryError("Cutout location is not in cube footprint!") self.cutout_lims = lims def _get_full_cutout_wcs(self, cube_table_header): """ Starting with the full FFI WCS and adjusting it for the cutout WCS. Adjusts CRPIX values and adds physical WCS keywords. Parameters ---------- cube_table_header : `~astropy.io.fits.Header` The FFI cube header for the data table extension. This allows the cutout WCS information to more closely match the mission TPF format. Resturns -------- response : `~astropy.wcs.WCS` The cutout WCS object including SIP distortions. """ wcs_header = self.cube_wcs.to_header(relax=True) # Using table comment rather than the default ones if available for kwd in wcs_header: if cube_table_header.get(kwd, None): wcs_header.comments[kwd] = cube_table_header[kwd] # Adjusting the CRPIX values wcs_header["CRPIX1"] -= self.cutout_lims[0, 0] wcs_header["CRPIX2"] -= self.cutout_lims[1, 0] # Adding the physical wcs keywords wcs_header.set("WCSNAMEP", "PHYSICAL", "name of world coordinate system alternate P") wcs_header.set("WCSAXESP", 2, "number of WCS physical axes") wcs_header.set("CTYPE1P", "RAWX", "physical WCS axis 1 type CCD col") wcs_header.set("CUNIT1P", "PIXEL", "physical WCS axis 1 unit") wcs_header.set("CRPIX1P", 1, "reference CCD column") wcs_header.set("CRVAL1P", self.cutout_lims[0, 0] + 1, "value at reference CCD column") wcs_header.set("CDELT1P", 1.0, "physical WCS axis 1 step") wcs_header.set("CTYPE2P", "RAWY", "physical WCS axis 2 type CCD col") wcs_header.set("CUNIT2P", "PIXEL", "physical WCS axis 2 unit") wcs_header.set("CRPIX2P", 1, "reference CCD row") wcs_header.set("CRVAL2P", self.cutout_lims[1, 0] + 1, "value at reference CCD row") wcs_header.set("CDELT2P", 1.0, "physical WCS axis 2 step") return wcs.WCS(wcs_header) def _fit_cutout_wcs(self, cutout_wcs, cutout_shape): """ Given a full (including SIP coefficients) wcs for the cutout, calculate the best fit linear wcs, and a measure of the goodness-of-fit. The new WCS is stored in ``self.cutout_wcs``. Goodness-of-fit measures are returned and stored in ``self.cutout_wcs_fit``. Parameters ---------- cutout_wcs : `~astropy.wcs.WCS` The full (including SIP coefficients) cutout WCS object cutout_shape : tuple The shape of the cutout in the form (width, height). Returns ------- response : tuple Goodness-of-fit statistics. (max dist, sigma) """ # Getting matched pixel, world coordinate pairs # We will choose no more than 100 pixels spread evenly throughout the image # Centered on the center pixel. # To do this we the appropriate "step size" between pixel coordinates # (i.e. we take every ith pixel in each row/column) [TODOOOOOO] # For example in a 5x7 cutout with i = 2 we get: # # xxoxoxx # ooooooo # xxoxoxx # ooooooo # xxoxoxx # # Where x denotes the indexes that will be used in the fit. y, x = cutout_shape i = 1 while (x/i)*(y/i) > 100: i += 1 xvals = list(reversed(range(x//2, -1, -i)))[:-1] + list(range(x//2, x, i)) if xvals[-1] != x-1: xvals += [x-1] if xvals[0] != 0: xvals = [0] + xvals yvals = list(reversed(range(y//2, -1, -i)))[:-1] + list(range(y//2, y, i)) if yvals[-1] != y-1: yvals += [y-1] if yvals[0] != 0: yvals = [0] + yvals pix_inds = np.array(list(product(xvals, yvals))) world_pix = SkyCoord(cutout_wcs.all_pix2world(pix_inds, 0), unit='deg') # Getting the fit WCS linear_wcs = fit_wcs_from_points([pix_inds[:, 0], pix_inds[:, 1]], world_pix, proj_point='center') self.cutout_wcs = linear_wcs # Checking the fit (we want to use all of the pixels for this) pix_inds = np.array(list(product(list(range(cutout_shape[1])), list(range(cutout_shape[0]))))) world_pix = SkyCoord(cutout_wcs.all_pix2world(pix_inds, 0), unit='deg') world_pix_new = SkyCoord(linear_wcs.all_pix2world(pix_inds, 0), unit='deg') dists = world_pix.separation(world_pix_new).to('deg') sigma = np.sqrt(sum(dists.value**2)) self.cutout_wcs_fit['WCS_MSEP'][0] = dists.max().value self.cutout_wcs_fit['WCS_SIG'][0] = sigma return (dists.max(), sigma) def _get_cutout_wcs_dict(self): """ Transform the cutout WCS object into the cutout column WCS keywords. Adds the physical keywords for transformation back from cutout to location on FFI. This is a very TESS specific function. Returns ------- response: dict Cutout wcs column header keywords as dictionary of ``{<kwd format string>: [value, desc]} pairs.`` """ wcs_header = self.cutout_wcs.to_header() cutout_wcs_dict = dict() ## Cutout array keywords ## cutout_wcs_dict["WCAX{}"] = [wcs_header['WCSAXES'], "number of WCS axes"] # TODO: check for 2? this must be two cutout_wcs_dict["1CTYP{}"] = [wcs_header["CTYPE1"], "right ascension coordinate type"] cutout_wcs_dict["2CTYP{}"] = [wcs_header["CTYPE2"], "declination coordinate type"] cutout_wcs_dict["1CRPX{}"] = [wcs_header["CRPIX1"], "[pixel] reference pixel along image axis 1"] cutout_wcs_dict["2CRPX{}"] = [wcs_header["CRPIX2"], "[pixel] reference pixel along image axis 2"] cutout_wcs_dict["1CRVL{}"] = [wcs_header["CRVAL1"], "[deg] right ascension at reference pixel"] cutout_wcs_dict["2CRVL{}"] = [wcs_header["CRVAL2"], "[deg] declination at reference pixel"] cutout_wcs_dict["1CUNI{}"] = [wcs_header["CUNIT1"], "physical unit in column dimension"] cutout_wcs_dict["2CUNI{}"] = [wcs_header["CUNIT2"], "physical unit in row dimension"] cutout_wcs_dict["1CDLT{}"] = [wcs_header["CDELT1"], "[deg] pixel scale in RA dimension"] cutout_wcs_dict["2CDLT{}"] = [wcs_header["CDELT1"], "[deg] pixel scale in DEC dimension"] cutout_wcs_dict["11PC{}"] = [wcs_header["PC1_1"], "Coordinate transformation matrix element"] cutout_wcs_dict["12PC{}"] = [wcs_header["PC1_2"], "Coordinate transformation matrix element"] cutout_wcs_dict["21PC{}"] = [wcs_header["PC2_1"], "Coordinate transformation matrix element"] cutout_wcs_dict["22PC{}"] = [wcs_header["PC2_2"], "Coordinate transformation matrix element"] ## Physical keywords ## # TODO: Make sure these are correct cutout_wcs_dict["WCSN{}P"] = ["PHYSICAL", "table column WCS name"] cutout_wcs_dict["WCAX{}P"] = [2, "table column physical WCS dimensions"] cutout_wcs_dict["1CTY{}P"] = ["RAWX", "table column physical WCS axis 1 type, CCD col"] cutout_wcs_dict["2CTY{}P"] = ["RAWY", "table column physical WCS axis 2 type, CCD row"] cutout_wcs_dict["1CUN{}P"] = ["PIXEL", "table column physical WCS axis 1 unit"] cutout_wcs_dict["2CUN{}P"] = ["PIXEL", "table column physical WCS axis 2 unit"] cutout_wcs_dict["1CRV{}P"] = [self.cutout_lims[0, 0] + 1, "table column physical WCS ax 1 ref value"] cutout_wcs_dict["2CRV{}P"] = [self.cutout_lims[1, 0] + 1, "table column physical WCS ax 2 ref value"] # TODO: can we calculate these? or are they fixed? cutout_wcs_dict["1CDL{}P"] = [1.0, "table column physical WCS a1 step"] cutout_wcs_dict["2CDL{}P"] = [1.0, "table column physical WCS a2 step"] cutout_wcs_dict["1CRP{}P"] = [1, "table column physical WCS a1 reference"] cutout_wcs_dict["2CRP{}P"] = [1, "table column physical WCS a2 reference"] return cutout_wcs_dict def _get_cutout(self, transposed_cube, threads: Union[int, Literal["auto"]] = 1, verbose=True): """ Making a cutout from an image/uncertainty cube that has been transposed to have time on the longest axis. Parameters ---------- transposed_cube : `numpy.array` Transposed image/uncertainty array. threads : int, "auto", default=1 Number of threads to use when making remote (e.g. s3) cutouts, will not use threads for local access <=1 disables the threadpool, >1 sets threadpool to the specified number of threads, "auto" uses `concurrent.futures.ThreadPoolExecutor`'s default: cpu_count + 4, limit to max of 32 verbose : bool Optional. If true intermediate information is printed. Returns ------- response : `numpy.array`, `numpy.array` (or `None`), `numpy.array` The untransposed image cutout array, the untransposed uncertainty cutout array (if `self.product` is SPOC, otherwise returns `None`), and the aperture array (an array the size of a single cutout that is 1 where there is image data and 0 where there isn't) """ # These limits are not guarenteed to be within the image footprint xmin, xmax = self.cutout_lims[1] ymin, ymax = self.cutout_lims[0] # Get the image array limits xmax_cube, ymax_cube, _, _ = transposed_cube.shape # Adjust limits and figuring out the padding padding = np.zeros((3, 2), dtype=int) if xmin < 0: padding[1, 0] = -xmin xmin = 0 if ymin < 0: padding[2, 0] = -ymin ymin = 0 if xmax > xmax_cube: padding[1, 1] = xmax - xmax_cube xmax = xmax_cube if ymax > ymax_cube: padding[2, 1] = ymax - ymax_cube ymax = ymax_cube # Doing the cutout if threads == "auto" or threads > 1: max_workers = None if threads == "auto" else threads with ThreadPoolExecutor(max_workers=max_workers) as pool: # increase download performance by making remote cutouts inside of a threadpool # astropy.io.fits Section class executes a list comprehension, generating a sequence of http range # requests (through fsspec) one per our slowest moving dimension (x). # https://github.com/astropy/astropy/blob/0a71105fbaa71439206d496a44df11abc0e3ac96/astropy/io/fits/hdu/image.py#L1059 # By running inside a threadpool, these requests instead execute concurrently which increases cutout # throughput. Extremely small cutouts (< 4px in x dim) are not likely to see an improvement cutouts = list(pool.map(lambda x: transposed_cube[x, ymin:ymax, :, :], range(xmin, xmax))) # stack the list of cutouts cutout = np.stack(cutouts) else: cutout = transposed_cube[xmin:xmax, ymin:ymax, :, :] img_cutout = cutout[:, :, :, 0].transpose((2, 0, 1)) if self.product == "SPOC": uncert_cutout = cutout[:, :, :, 1].transpose((2, 0, 1)) else: uncert_cutout = None # Making the aperture array aperture = np.ones((xmax-xmin, ymax-ymin), dtype=np.int32) # Adding padding to the cutouts so that it's the expected size if padding.any(): # only do if we need to pad img_cutout = np.pad(img_cutout, padding, 'constant', constant_values=np.nan) if self.product == "SPOC": uncert_cutout = np.pad(uncert_cutout, padding, 'constant', constant_values=np.nan) aperture = np.pad(aperture, padding[1:], 'constant', constant_values=0) if verbose: print("Image cutout cube shape: {}".format(img_cutout.shape)) if self.product == "SPOC": print("Uncertainty cutout cube shape: {}".format(uncert_cutout.shape)) return img_cutout, uncert_cutout, aperture def _update_primary_header(self, primary_header): """ Updates the primary header for the cutout target pixel file by filling in the object ra and dec with the central cutout coordinates and filling in the rest of the TESS target pixel file keywords wil 0/empty strings as we do not have access to this information. This is a TESS-specific function. Parameter(s) ---------- primary_header : `~astropy.io.fits.Header` The primary header from the cube file that will be modified in place for the cutout. """ # Adding cutout specific headers primary_header['CREATOR'] = ('astrocut', 'software used to produce this file') primary_header['PROCVER'] = (__version__, 'software version') primary_header['FFI_TYPE'] = (self.product, 'the FFI type used to make the cutouts') # TODO : The name of FIRST_FFI (and LAST_FFI) is too long to be a header kwd value. # Find a way to include these in the headers without breaking astropy (maybe abbreviate?). # primary_header['FIRST_FFI'] = (self.first_ffi, 'the FFI used for the primary header # keyword values, except TSTOP') # primary_header['LAST_FFI'] = (self.last_ffi, 'the FFI used for the TSTOP keyword value') primary_header['RA_OBJ'] = (self.center_coord.ra.deg, '[deg] right ascension') primary_header['DEC_OBJ'] = (self.center_coord.dec.deg, '[deg] declination') timeref = 'SOLARSYSTEM' if self.product == 'SPOC' else None tassign = 'SPACECRAFT' if self.product == 'SPOC' else None primary_header['TIMEREF'] = (timeref, 'barycentric correction applied to times') primary_header['TASSIGN'] = (tassign, 'where time is assigned') primary_header['TIMESYS'] = ('TDB', 'time system is Barycentric Dynamical Time (TDB)') primary_header['BJDREFI'] = (2457000, 'integer part of BTJD reference date') primary_header['BJDREFF'] = (0.00000000, 'fraction of the day in BTJD reference date') primary_header['TIMEUNIT'] = ('d', 'time unit for TIME, TSTART and TSTOP') delete_kwds_wildcards = ['SC_*', 'RMS*', 'A_*', 'AP_*', 'B_*', 'BP*', 'GAIN*', 'TESS_*', 'CD*', 'CT*', 'CRPIX*', 'CRVAL*', 'MJD*'] delete_kwds = ['COMMENT', 'FILTER', 'TIME', 'EXPTIME', 'ACS_MODE', 'DEC_TARG', 'FLXWIN', 'RA_TARG', 'CCDNUM', 'CAMNUM', 'WCSGDF', 'UNITS', 'CADENCE', 'SCIPIXS', 'INT_TIME', 'PIX_CAT', 'REQUANT', 'DIFF_HUF', 'PRIM_HUF', 'QUAL_BIT', 'SPM', 'STARTTJD', 'ENDTJD', 'CRM', 'TJD_ZERO', 'CRM_N', 'ORBIT_ID', 'MIDTJD'] if self.product == 'TICA': # Adding some missing kwds not in TICA (but in Ames-produced SPOC ffis) primary_header['EXTVER'] = ('1', 'extension version number (not format version)') primary_header['SIMDATA'] = (False, 'file is based on simulated data') primary_header['NEXTEND'] = ('2', 'number of standard extensions') primary_header['TSTART'] = (primary_header['STARTTJD'], 'observation start time in TJD of first FFI') primary_header['TSTOP'] = (primary_header['ENDTJD'], 'observation stop time in TJD of last FFI') primary_header['CAMERA'] = (primary_header['CAMNUM'], 'Camera number') primary_header['CCD'] = (primary_header['CCDNUM'], 'CCD chip number') primary_header['ASTATE'] = (None, 'archive state F indicates single orbit processing') primary_header['CRMITEN'] = (primary_header['CRM'], 'spacecraft cosmic ray mitigation enabled') primary_header['CRBLKSZ'] = (None, '[exposures] s/c cosmic ray mitigation block siz') primary_header['FFIINDEX'] = (primary_header['CADENCE'], 'number of FFI cadence interval of first FFI') primary_header['DATA_REL'] = (None, 'data release version number') date_obs = Time(primary_header['TSTART']+primary_header['BJDREFI'], format='jd').iso date_end = Time(primary_header['TSTOP']+primary_header['BJDREFI'], format='jd').iso primary_header['DATE-OBS'] = (date_obs, 'TSTART as UTC calendar date of first FFI') primary_header['DATE-END'] = (date_end, 'TSTOP as UTC calendar date of last FFI') primary_header['FILEVER'] = (None, 'file format version') primary_header['RADESYS'] = (None, 'reference frame of celestial coordinates') primary_header['SCCONFIG'] = (None, 'spacecraft configuration ID') primary_header['TIMVERSN'] = (None, 'OGIP memo number for file format') # Bulk removal with wildcards. Most of these should only live in EXT 1 header. for kwd in delete_kwds_wildcards: try: del primary_header[kwd] except KeyError: continue # Removal of specific kwds not relevant for cutouts. # Most likely these describe a single FFI, and not # the whole cube, which is misleading because we are # working with entire stacks of FFIs. Other keywords # are analogs to ones that have already been added # to the primary header in the lines above. for kwd in delete_kwds: try: del primary_header[kwd] except KeyError: continue telapse = primary_header.get("TSTOP", 0) - primary_header.get("TSTART", 0) primary_header['TELAPSE '] = (telapse, '[d] TSTOP - TSTART') # Updating card comment to be more explicit primary_header['DATE'] = (primary_header['DATE'], 'FFI cube creation date') # Specifying that some of these headers keyword values are inherited from the first FFI if self.product == 'SPOC': primary_header['TSTART'] = (primary_header['TSTART'], 'observation start time in TJD of first FFI') primary_header['TSTOP'] = (primary_header['TSTOP'], 'observation stop time in TJD of last FFI') primary_header['DATE-OBS'] = (primary_header['DATE-OBS'], 'TSTART as UTC calendar date of first FFI') primary_header['DATE-END'] = (primary_header['DATE-END'], 'TSTOP as UTC calendar date of last FFI') primary_header['FFIINDEX'] = (primary_header['FFIINDEX'], 'number of FFI cadence interval of first FFI') # These are all the things in the TESS pipeline tpfs about the object that we can't fill primary_header['OBJECT'] = ("", 'string version of target id ') primary_header['TCID'] = (0, 'unique tess target identifier') primary_header['PXTABLE'] = (0, 'pixel table id') primary_header['PMRA'] = (0.0, '[mas/yr] RA proper motion') primary_header['PMDEC'] = (0.0, '[mas/yr] Dec proper motion') primary_header['PMTOTAL'] = (0.0, '[mas/yr] total proper motion') primary_header['TESSMAG'] = (0.0, '[mag] TESS magnitude') primary_header['TEFF'] = (0.0, '[K] Effective temperature') primary_header['LOGG'] = (0.0, '[cm/s2] log10 surface gravity') primary_header['MH'] = (0.0, '[log10([M/H])] metallicity') primary_header['RADIUS'] = (0.0, '[solar radii] stellar radius') primary_header['TICVER'] = (0, 'TICVER') primary_header['TICID'] = (None, 'unique tess target identifier') def _add_column_wcs(self, table_header, wcs_dict): """ Adds WCS information for the array columns to the cutout table header. Parameters ---------- table_header : `~astropy.io.fits.Header` The table header for the cutout table that will be modified in place to include WCS information. wcs_dict : dict Dictionary of wcs keyword/value pairs to be added to each array column in the cutout table header. """ wcs_col = False # says if column is one that requires wcs info for kwd in table_header: # Adding header descriptions for the table keywords if "TTYPE" in kwd: table_header.comments[kwd] = "column name" elif "TFORM" in kwd: table_header.comments[kwd] = "column format" elif "TUNIT" in kwd: table_header.comments[kwd] = "unit" elif "TDISP" in kwd: table_header.comments[kwd] = "display format" elif "TDIM" in kwd: table_header.comments[kwd] = "multi-dimensional array spec" wcs_col = True # if column holds 2D array need to add wcs info elif "TNULL" in kwd: table_header.comments[kwd] = "null value" # Adding wcs info if necessary if (kwd[:-1] == "TTYPE") and wcs_col: wcs_col = False # reset for wcs_key, (val, com) in wcs_dict.items(): table_header.insert(kwd, (wcs_key.format(int(kwd[-1])-1), val, com)) def _add_img_kwds(self, table_header): """ Adding extra keywords to the table header. Parameters ---------- table_header : `~astropy.io.fits.Header` The table header to add keywords to. It will be modified in place. """ for key in self.img_kwds: # We'll skip these TICA-specific image keywords that are single-FFI specific # or just not helpful if (key == 'TIME') or (key == 'EXPTIME') or (key == 'FILTER'): continue table_header[key] = tuple(self.img_kwds[key]) def _apply_header_inherit(self, hdu_list): """ The INHERIT keyword indicated that keywords from the primary header should be duplicated in the headers of all subsequent extensions. This function performs this addition in place to the given hdu list. Parameters ---------- hdu_list : `~astropy.io.fits.HDUList` The hdu list to aaply the INHERIT keyword to. """ primary_header = hdu_list[0].header reserved_kwds = ["COMMENT", "SIMPLE", "BITPIX", "EXTEND", "NEXTEND"] for hdu in hdu_list[1:]: if hdu.header.get("INHERIT", False): for kwd in primary_header: if (kwd not in hdu.header) and (kwd not in reserved_kwds): hdu.header[kwd] = (primary_header[kwd], primary_header.comments[kwd]) def _build_tpf(self, cube_fits, img_cube, uncert_cube, cutout_wcs_dict, aperture): """ Building the cutout target pixel file (TPF) and formatting it to match TESS pipeline TPFs. Paramters --------- cube_fits : `~astropy.io.fits.hdu.hdulist.HDUList` The cube hdu list. img_cube : `numpy.array` The untransposed image cutout array uncert_cube : `numpy.array` The untransposed uncertainty cutout array. This value is set to `None` by default for TICA cutouts. cutout_wcs_dict : dict Dictionary of wcs keyword/value pairs to be added to each array column in the cutout table header. aperture : `numpy.array` The aperture array (an array the size of a single cutout that is 1 where there is image data and 0 where there isn't) verbose : bool Optional. If true intermediate information is printed. Returns ------- response : `~astropy.io.fits.HDUList` Target pixel file HDU list """ # The primary hdu is just the main header, which is the same # as the one on the cube file primary_hdu = cube_fits[0] self._update_primary_header(primary_hdu.header) cols = list() # Adding the cutouts tform = str(img_cube[0].size) + "E" dims = str(img_cube[0].shape[::-1]) empty_arr = np.zeros(img_cube.shape) # Adding the Time relates columns start = 'TSTART' if self.product == 'SPOC' else 'STARTTJD' stop = 'TSTOP' if self.product == 'SPOC' else 'ENDTJD' cols.append(fits.Column(name='TIME', format='D', unit='BJD - 2457000, days', disp='D14.7', array=(cube_fits[2].columns[start].array + cube_fits[2].columns[stop].array)/2)) if self.product == 'SPOC': cols.append(fits.Column(name='TIMECORR', format='E', unit='d', disp='E14.7', array=cube_fits[2].columns['BARYCORR'].array)) # Adding CADENCENO as zeros for SPOC b/c we don't have this info cadence_array = np.zeros(img_cube.shape)[:, 0, 0] if self.product == 'SPOC'\ else cube_fits[2].columns['CADENCE'].array cols.append(fits.Column(name='CADENCENO', format='J', disp='I10', array=cadence_array)) # Adding counts (-1 b/c we don't have data) cols.append(fits.Column(name='RAW_CNTS', format=tform.replace('E', 'J'), unit='count', dim=dims, disp='I8', array=empty_arr-1, null=-1)) # Adding flux and flux_err (data we actually have!) pixel_unit = 'e-/s' if self.product == 'SPOC' else 'e-' flux_err_array = uncert_cube if self.product == 'SPOC' else empty_arr cols.append(fits.Column(name='FLUX', format=tform, dim=dims, unit=pixel_unit, disp='E14.7', array=img_cube)) cols.append(fits.Column(name='FLUX_ERR', format=tform, dim=dims, unit=pixel_unit, disp='E14.7', array=flux_err_array)) # Adding the background info (zeros b.c we don't have this info) cols.append(fits.Column(name='FLUX_BKG', format=tform, dim=dims, unit=pixel_unit, disp='E14.7', array=empty_arr)) cols.append(fits.Column(name='FLUX_BKG_ERR', format=tform, dim=dims, unit=pixel_unit, disp='E14.7', array=empty_arr)) # Adding the quality flags data_quality = 'DQUALITY' if self.product == 'SPOC' else 'QUAL_BIT' cols.append(fits.Column(name='QUALITY', format='J', disp='B16.16', array=cube_fits[2].columns[data_quality].array)) # Adding the position correction info (zeros b.c we don't have this info) cols.append(fits.Column(name='POS_CORR1', format='E', unit='pixel', disp='E14.7', array=empty_arr[:, 0, 0])) cols.append(fits.Column(name='POS_CORR2', format='E', unit='pixel', disp='E14.7', array=empty_arr[:, 0, 0])) # Adding the FFI_FILE column (not in the pipeline tpfs) cols.append(fits.Column(name='FFI_FILE', format='38A', unit='pixel', array=cube_fits[2].columns['FFI_FILE'].array)) # making the table HDU table_hdu = fits.BinTableHDU.from_columns(cols) table_hdu.header['EXTNAME'] = 'PIXELS' table_hdu.header['INHERIT'] = True # Adding the wcs keywords to the columns and removing from the header self._add_column_wcs(table_hdu.header, cutout_wcs_dict) # Adding the extra image keywords self._add_img_kwds(table_hdu.header) # Building the aperture HDU aperture_hdu = fits.ImageHDU(data=aperture) aperture_hdu.header['EXTNAME'] = 'APERTURE' for kwd, val, cmt in self.cutout_wcs.to_header().cards: aperture_hdu.header.set(kwd, val, cmt) # Adding extra aperture keywords (TESS specific) aperture_hdu.header.set("NPIXMISS", None, "Number of op. aperture pixels not collected") aperture_hdu.header.set("NPIXSAP", None, "Number of pixels in optimal aperture") # Adding goodness-of-fit keywords for key in self.cutout_wcs_fit: aperture_hdu.header[key] = tuple(self.cutout_wcs_fit[key]) aperture_hdu.header['INHERIT'] = True cutout_hdu_list = fits.HDUList([primary_hdu, table_hdu, aperture_hdu]) self._apply_header_inherit(cutout_hdu_list) return cutout_hdu_list
[docs] def cube_cut( self, cube_file, coordinates, cutout_size, product="SPOC", target_pixel_file=None, output_path=".", threads: Union[int, Literal["auto"]] = 1, verbose=False, ): """ Takes a cube file (as created by `~astrocut.CubeFactory`), and makes a cutout target pixel file of the given size around the given coordinates. The target pixel file is formatted like a TESS pipeline target pixel file. Parameters ---------- cube_file : str The cube file containing all the images to be cutout. Must be in the format returned by ~astrocut.make_cube. coordinates : str or `astropy.coordinates.SkyCoord` object The position around which to cutout. It may be specified as a string ("ra dec" in degrees) or as the appropriate `~astropy.coordinates.SkyCoord` object. cutout_size : int, array-like, `~astropy.units.Quantity` The size of the cutout array. If ``cutout_size`` is a scalar number or a scalar `~astropy.units.Quantity`, then a square cutout of ``cutout_size`` will be created. If ``cutout_size`` has two elements, they should be in ``(ny, nx)`` order. Scalar numbers in ``cutout_size`` are assumed to be in units of pixels. `~astropy.units.Quantity` objects must be in pixel or angular units. product : str The product type to make the cutouts from. Can either be 'SPOC' or 'TICA' (default is 'SPOC'). target_pixel_file : str Optional. The name for the output target pixel file. If no name is supplied, the file will be named: ``<cube_file_base>_<ra>_<dec>_<cutout_size>_astrocut.fits`` output_path : str Optional. The path where the output file is saved. The current directory is default. threads : int, "auto", default=1 Number of threads to use when making remote (e.g. s3) cutouts, will not use threads for local access <=1 disables the threadpool, >1 sets threadpool to the specified number of threads, "auto" uses `concurrent.futures.ThreadPoolExecutor`'s default: cpu_count + 4, limit to max of 32 verbose : bool Optional. If true intermediate information is printed. Returns ------- response: string or None If successfull, returns the path to the target pixel file, if unsuccessful returns None. """ if verbose: start_time = time() # Declare the product type being used to make the cutouts self.product = product.upper() warnings.filterwarnings("ignore", category=wcs.FITSFixedWarning) fits_options: Dict[str, Any] = {"mode": "denywrite", "lazy_load_hdus": True} if cube_file.startswith("s3://"): fits_options["use_fsspec"] = True # block size should be: # block_size <= m * hdul[1].section.shape[2] * hdul[1].section.shape[3] * 4 bytes fits_options["fsspec_kwargs"] = {"default_block_size": 10_000, "anon": True} # only use .section for remote data cube_data_prop = "section" else: fits_options["memmap"] = True cube_data_prop = "data" # disable threading for local storage access threads = 1 with fits.open(cube_file, **fits_options) as cube: # Get the info we need from the data table self._parse_table_info(cube[2].data, verbose) if isinstance(coordinates, SkyCoord): self.center_coord = coordinates else: self.center_coord = SkyCoord(coordinates, unit='deg') if verbose: print("Cutout center coordinate: {},{}".format(self.center_coord.ra.deg, self.center_coord.dec.deg)) # Making size into an array [ny, nx] if np.isscalar(cutout_size): cutout_size = np.repeat(cutout_size, 2) if isinstance(cutout_size, u.Quantity): cutout_size = np.atleast_1d(cutout_size) if len(cutout_size) == 1: cutout_size = np.repeat(cutout_size, 2) if len(cutout_size) > 2: warnings.warn("Too many dimensions in cutout size, only the first two will be used.", InputWarning) cutout_size = cutout_size[:2] # Get cutout limits self._get_cutout_limits(cutout_size) if verbose: print("xmin,xmax: {}".format(self.cutout_lims[1])) print("ymin,ymax: {}".format(self.cutout_lims[0])) # Make the cutout img_cutout, uncert_cutout, aperture = self._get_cutout(getattr(cube[1], cube_data_prop), threads=threads, verbose=verbose) # Get cutout wcs info cutout_wcs_full = self._get_full_cutout_wcs(cube[2].header) max_dist, sigma = self._fit_cutout_wcs(cutout_wcs_full, img_cutout.shape[1:]) if verbose: print("Maximum distance between approximate and true location: {}".format(max_dist)) print("Error in approximate WCS (sigma): {}".format(sigma)) cutout_wcs_dict = self._get_cutout_wcs_dict() # Build the TPF tpf_object = self._build_tpf(cube, img_cutout, uncert_cutout, cutout_wcs_dict, aperture) if verbose: write_time = time() if not target_pixel_file: _, flename = os.path.split(cube_file) width = self.cutout_lims[0, 1]-self.cutout_lims[0, 0] height = self.cutout_lims[1, 1]-self.cutout_lims[1, 0] target_pixel_file = "{}_{:7f}_{:7f}_{}x{}_astrocut.fits".format(flename.rstrip('.fits').rstrip("-cube"), self.center_coord.ra.value, self.center_coord.dec.value, width, height) target_pixel_file = os.path.join(output_path, target_pixel_file) if verbose: print("Target pixel file: {}".format(target_pixel_file)) # Make sure the output directory exists if not os.path.exists(output_path): os.makedirs(output_path) # Write the TPF tpf_object.writeto(target_pixel_file, overwrite=True, checksum=True) if verbose: print("Write time: {:.2} sec".format(time()-write_time)) print("Total time: {:.2} sec".format(time()-start_time)) return target_pixel_file