Source code for astrocut.asdf_cutout

import sys
import warnings
from contextlib import nullcontext
from copy import copy
from datetime import date
from pathlib import Path
from time import monotonic
from typing import List, Optional, Tuple, Union

import asdf
import gwcs
import numpy as np
from asdf.tags.core.ndarray import NDArrayType
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.modeling import models
from astropy.nddata.utils import Cutout2D, NoOverlapError
from astropy.units import Quantity
from astropy.utils.decorators import deprecated_renamed_argument
from astropy.wcs import WCS
from packaging.version import Version
from s3path import S3Path

from . import __version__, log
from .exceptions import DataWarning, InvalidInputError, InvalidQueryError, ModuleWarning
from .image_cutout import ImageCutout


[docs] class ASDFCutout(ImageCutout): """ Class for creating cutouts from ASDF files. Parameters ---------- input_files : list List of input image files. coordinates : str | `~astropy.coordinates.SkyCoord` Coordinates of the center of the cutout. cutout_size : int | array | list | tuple | `~astropy.units.Quantity` Size of the cutout array. fill_value : int | float Value to fill the cutout with if the cutout is outside the image. Default is np.nan. If the input data array has an integer data type, the fill value will be converted to an integer (e.g., a fill value of 1.0 will be converted to 1). If the conversion fails, it will default to 0. key : str Optional, default None. Access key ID for S3 file system. secret : str Optional, default None. Secret access key for S3 file system. token : str Optional, default None. Security token for S3 file system. lite : bool Optional, default True. If True, the cutout will be created in "lite" mode, which means that it will only contain the data and an updated world coordinate system. If False, cutouts will be made from all arrays in the input file (e.g., data, error, uncertainty, variance, etc.) where the last two dimensions match the shape of the science data array. It also preserves all of the metadata from the input file. verbose : bool If True, log messages are printed to the console. Attributes ---------- cutouts : list The cutouts as a list of `astropy.nddata.Cutout2D` objects. cutouts_by_file : dict The cutouts as `astropy.nddata.Cutout2D` objects stored by input filename. fits_cutouts : list The cutouts as a list `astropy.io.fits.HDUList` objects. asdf_cutouts : list The cutouts as a list of `asdf.AsdfFile` objects. image_cutouts : list List of `~PIL.Image.Image` objects representing the cutouts. Methods ------- cutout() Generate cutouts from a list of input images. write_as_fits(output_dir) Write the cutouts to disk or memory in FITS format. write_as_asdf(output_dir) Write the cutouts to disk or memory in ASDF format. """ def __init__( self, input_files: List[Union[str, Path, S3Path]], coordinates: Union[SkyCoord, str], cutout_size: Union[int, np.ndarray, Quantity, List[int], Tuple[int]] = 25, fill_value: Union[int, float] = np.nan, key: Optional[str] = None, secret: Optional[str] = None, token: Optional[str] = None, lite: Optional[bool] = True, verbose: bool = False, ): # Superclass constructor super().__init__(input_files, coordinates, cutout_size, fill_value, verbose=verbose) # Must be using Python 3.11 or higher to support stdatamodels and ASDF-in-FITS embedding self._py311_or_higher = sys.version_info >= (3, 11) self._asdf_in_fits = None # Will be set to the asdf_in_fits module if available # Assign AWS credential attributes self._key = key self._secret = secret self._token = token self._mission_kwd = "roman" self.cutouts = [] # Public attribute to hold `Cutout2D` objects self._asdf_cutouts = None # Store ASDF objects self._fits_cutouts = None # Store FITS objects self._sliced_gwcs_objects = [] # Store sliced GWCS objects for lite mode self._asdf_trees = [] # Store ASDF trees for each cutout self._lite = lite # Flag for lite mode self._primary_header_template = None # Optional template for primary header keywords self._fill_value_cache = {} # Cache for converted fill values based on input data types # Make cutouts self.cutout() def _check_asdf_in_fits_support(self): if self._asdf_in_fits is not None: return # Try to import stdatamodels for ASDF-in-FITS embedding if self._py311_or_higher: try: # Check version of stdatamodels from stdatamodels import __version__ as stdata_version from stdatamodels import asdf_in_fits if Version(stdata_version) < Version("4.1.0"): warnings.warn( "The `stdatamodels` package is not available in the correct version (>=4.1.0); " "ASDF-in-FITS embedding will be skipped for these cutouts. Install the optional " 'dependency with: pip install "astrocut[all]" or pip install stdatamodels>=4.1.0', ModuleWarning, ) else: self._asdf_in_fits = asdf_in_fits except ImportError: warnings.warn( "The `stdatamodels` package cannot be imported; ASDF-in-FITS embedding will be " "skipped for these cutouts. Install the optional dependency with: " 'pip install "astrocut[all]" or pip install stdatamodels>=4.1.0', ModuleWarning, ) else: warnings.warn( "ASDF-in-FITS embedding requires Python 3.11 or higher. Skipping embedding for these cutouts.", ModuleWarning, ) @property def fits_cutouts(self) -> List[fits.HDUList]: """ Return the cutouts as a list `astropy.io.fits.HDUList` objects. """ if self._fits_cutouts is not None: return self._fits_cutouts fits_cutouts = [] today_str = str(date.today()) for i, (file, cutouts) in enumerate(self.cutouts_by_file.items()): cutout = cutouts[0] if self._lite: tree = { # Tree should only include sliced WCS and original filename self._mission_kwd: {"meta": {"wcs": self._sliced_gwcs_objects[i], "orig_file": str(file)}} } else: source_tree = self._asdf_trees[i] # Build a metadata-only tree for FITS embedding without mutating cached ASDF trees. tree = {self._mission_kwd: {"meta": source_tree[self._mission_kwd]["meta"]}} # Build the PrimaryHDU with keywords if self._primary_header_template is None: self._primary_header_template = fits.Header( [ ("ORIGIN", "STScI/MAST"), ("PROCVER", __version__), ("RA_OBJ", self._coordinates.ra.deg), ("DEC_OBJ", self._coordinates.dec.deg), ] ) primary_header = self._primary_header_template.copy() primary_header["DATE"] = today_str # Update date to current date for each file primary_hdu = fits.PrimaryHDU(header=primary_header) # Build ImageHDU with cutout data and WCS image_hdu = fits.ImageHDU(data=cutout.data, header=cutout.wcs.to_header(relax=True)) image_hdu.header["ORIG_FLE"] = str(file) # Add original file to header image_hdu.header["EXTNAME"] = "CUTOUT" hdul = fits.HDUList([primary_hdu, image_hdu]) # Check for ASDF-in-FITS embedding support and set flag self._check_asdf_in_fits_support() if self._asdf_in_fits is not None: hdul_embed = self._asdf_in_fits.to_hdulist(tree, hdul) else: hdul_embed = hdul fits_cutouts.append(hdul_embed) self._fits_cutouts = fits_cutouts return fits_cutouts @property def asdf_cutouts(self) -> List[asdf.AsdfFile]: """ Return the cutouts as a list of `asdf.AsdfFile` objects. """ if self._asdf_cutouts is not None: return self._asdf_cutouts asdf_cutouts = [] for i, (file, cutouts) in enumerate(self.cutouts_by_file.items()): cutout = cutouts[0] if self._lite: tree = { self._mission_kwd: { "meta": {"wcs": self._sliced_gwcs_objects[i], "orig_file": str(file)}, "data": cutout.data, } } else: tree = self._asdf_trees[i] # Create the AsdfFile object and add history to it af = asdf.AsdfFile(tree) af.add_history_entry( f"Cutout of size {cutout.shape} at sky coordinates " f"({self._coordinates.ra.value}, {self._coordinates.dec.value})", software={ "name": "astrocut", "author": "Space Telescope Science Institute", "version": __version__, "homepage": "https://astrocut.readthedocs.io/en/latest/", }, ) asdf_cutouts.append(af) self._asdf_cutouts = asdf_cutouts return asdf_cutouts def _get_cloud_file(self, input_file: Union[str, S3Path]): """ Open a cloud-hosted file using fsspec. Parameters ---------- input_file : str | S3Path The input file S3 URI. Returns ------- file-like object An open binary file handle for the cloud resource. """ # Import fsspec here to avoid adding it as a dependency for users who don't need cloud support import fsspec fsspec_kwargs = {} if self._key is None and self._secret is None and self._token is None: fsspec_kwargs["anon"] = True else: if self._key is not None: fsspec_kwargs["key"] = self._key if self._secret is not None: fsspec_kwargs["secret"] = self._secret if self._token is not None: fsspec_kwargs["token"] = self._token return fsspec.open(input_file, mode="rb", **fsspec_kwargs) def _get_fill_value(self, dtype: np.dtype) -> Union[int, float]: """ Get the appropriate fill value for a given data type, converting if necessary. Parameters ---------- dtype : np.dtype The data type of the input array. Returns ------- fill_value : int | float The fill value converted to the appropriate type if necessary. """ if dtype in self._fill_value_cache: return self._fill_value_cache[dtype] fill_value = self._fill_value if np.issubdtype(dtype, np.integer) and not isinstance(fill_value, int): log.debug("Input data array has integer data type, converting fill_value to integer.") try: fill_value = int(self._fill_value) except ValueError: fill_value = 0 # Default to 0 if conversion fails self._fill_value_cache[dtype] = fill_value return fill_value def _make_cutout(self, array: np.ndarray, position: tuple, wcs: WCS) -> Cutout2D: """ Helper to generate a Cutout2D and return plain ndarray data. Parameters ---------- array : np.ndarray The input data array. position : tuple The (x, y) position of the cutout center. wcs : WCS The WCS object associated with the input array. Returns ------- cutout : Cutout2D The generated cutout. """ # If the array has an integer data type, fill_value must be an integer fill_value = self._get_fill_value(array.dtype) cutout = Cutout2D( array, position=position, wcs=wcs, size=(self._cutout_size[1], self._cutout_size[0]), mode="partial", fill_value=fill_value, # Keep cutouts detached from source arrays so downstream serialization # does not preserve references to full-size parent data. copy=True, ) # Strip units if present if isinstance(cutout.data, Quantity): cutout.data = cutout.data.value return cutout def _apply_cutout_slices(self, array: np.ndarray, data_cutout: Cutout2D) -> np.ndarray: """ Apply an existing Cutout2D footprint to another aligned array. Parameters ---------- array : np.ndarray The input array to apply the cutout slices to. data_cutout : Cutout2D The Cutout2D object containing the original cutout slices. Returns ------- result : np.ndarray The cutout array with the same shape as the input array, where the cutout region is filled with data from the input array and the rest is filled with the fill value. """ orig_slices = data_cutout.slices_original cutout_slices = data_cutout.slices_cutout out_shape = data_cutout.data.shape fill_value = self._get_fill_value(array.dtype) # Build a result array for the cutout filled with the fill value result = np.full( array.shape[:-2] + out_shape, fill_value, dtype=array.dtype, ) # Insert original data into the cutout region of the result array result[..., cutout_slices[0], cutout_slices[1]] = array[..., orig_slices[0], orig_slices[1]] return result def _get_cutout_data(self, mission_tree: dict, wcs: WCS, pixel_coords: Tuple[int, int]) -> Cutout2D: """ Get the cutout data from the input image. Parameters ---------- mission_tree : dict The mission-specific tree of the input file. wcs : `~astropy.wcs.WCS` The approximated WCS of the input image. pixel_coords : tuple The pixel coordinates closest to the center of the cutout. Returns ------- img_cutout : `~astropy.nddata.Cutout2D` The cutout object. """ # Shape of data array mission_data = mission_tree["data"] data_shape = mission_data.shape # Make data cutout data_cutout = self._make_cutout(mission_data, pixel_coords, wcs) # If full cutout, apply the same cutout slices to other arrays in the mission tree that # are aligned with the data array, i.e. have the same shape in the last two dimensions if not self._lite: for key, obj in mission_tree.items(): if not isinstance(obj, (np.ndarray, NDArrayType)): continue # Skip non-array objects shape = obj.shape if shape[-2:] != data_shape[-2:]: continue # Skip arrays not aligned with science data log.debug("Original %s shape: %s", key, shape) arr_cutout = self._apply_cutout_slices(obj, data_cutout) mission_tree[key] = arr_cutout log.debug("%s cutout shape: %s", key, arr_cutout.shape) return data_cutout def _slice_gwcs(self, cutout: Cutout2D, gwcs: gwcs.wcs.WCS) -> gwcs.wcs.WCS: """ Slice the original gwcs object. "Slices" the original gwcs object down to the cutout shape. This is a hack until proper gwcs slicing is in place a la fits WCS slicing. The ``slices`` keyword input is a tuple with the x, y cutout boundaries in the original image array, e.g. ``cutout.slices_original``. Astropy Cutout2D slices are in the form ((ymin, ymax, None), (xmin, xmax, None)) Parameters ---------- cutout : astropy.nddata.Cutout2D The cutout object. gwcs : gwcs.wcs.WCS The original GWCS from the input image. Returns ------- gwcs.wcs.WCS The sliced GWCS object. """ # Create copy of original gwcs object tmp = copy(gwcs) # Get the cutout array bounds and create a new shift transform to the cutout # Add the new transform to the gwcs slices = cutout.slices_original xmin, xmax = slices[1].start, slices[1].stop ymin, ymax = slices[0].start, slices[0].stop shape = (xmax - xmin, ymax - ymin) offsets = models.Shift(xmin, name="cutout_offset1") & models.Shift(ymin, name="cutout_offset2") tmp.insert_transform("detector", offsets, after=True) # Modify the gwcs bounding box to the cutout shape tmp.bounding_box = ((0, shape[0] - 1), (0, shape[1] - 1)) tmp.pixel_shape = shape tmp.array_shape = shape[::-1] return tmp def _cutout_file(self, file: Union[str, Path, S3Path]): """ Create a cutout from a single input file. Parameters ---------- file : str | Path | S3Path The input file to create a cutout from. """ input_file = file cloud_file = None # If file comes from AWS cloud bucket, open it with fsspec and pass the file handle to ASDF. if (isinstance(file, str) and file.startswith("s3://")) or isinstance(file, S3Path): cloud_file = self._get_cloud_file(file) if cloud_file is not None: asdf_file = cloud_file else: asdf_file = nullcontext(file) with asdf_file as file_handle: with asdf.open(file_handle) as af: # Load the data from the input file tree = af.tree mission_tree = tree[self._mission_kwd] if self._mission_kwd in tree else None if mission_tree is None: warnings.warn( f"File {input_file} does not contain the expected mission keyword '{self._mission_kwd}'. " "Skipping...", DataWarning, ) return # Skip if the file does not contain a GWCS object gwcs = mission_tree["meta"].get("wcs", None) if gwcs is None: warnings.warn(f"File {input_file} does not contain a GWCS object. Skipping...", DataWarning) return new_mission_tree = {"meta": mission_tree.get("meta", {})} new_tree = {self._mission_kwd: new_mission_tree} data_shape = mission_tree["data"].shape for key, value in mission_tree.items(): if isinstance(value, (np.ndarray, NDArrayType)): if value.shape[-2:] == data_shape[-2:]: new_mission_tree[key] = value # Get closest pixel coordinates and approximated WCS pixel_coords, wcs = get_center_pixel(gwcs, self._coordinates.ra.value, self._coordinates.dec.value) # Create the cutout try: data_cutout = self._get_cutout_data(new_mission_tree, wcs, pixel_coords) except NoOverlapError: warnings.warn( f"Cutout footprint does not overlap with data in {input_file}, skipping...", DataWarning ) return # Check that there is data in the cutout image data = data_cutout.data if np.isnan(data).all() or not np.any(data): warnings.warn(f"Cutout of {input_file} contains no data, skipping...", DataWarning) return # Store the Cutout2D object self.cutouts.append(data_cutout) # Slice the GWCS to the cutout and store it for use in lite mode and in ASDF trees sliced_gwcs = self._slice_gwcs(data_cutout, gwcs) self._sliced_gwcs_objects.append(sliced_gwcs) if not self._lite: new_mission_tree["meta"]["wcs"] = sliced_gwcs # Store the original filename in the tree metadata for the ASDF cutout new_mission_tree["meta"]["orig_file"] = str(input_file) self._asdf_trees.append(new_tree) # Store cutout with filename self.cutouts_by_file[input_file] = [data_cutout]
[docs] def cutout(self) -> Union[str, List[str], List[fits.HDUList]]: """ Generate cutouts from a list of input images. Returns ------- cutout_path : Path | list Cutouts as memory objects or path(s) to the written cutout files. Raises ------ InvalidQueryError If no cutouts contain data. """ # Track start time start_time = monotonic() # Cutout each input file for file in self._input_files: self._cutout_file(file) # If no cutouts contain data, raise exception if not self.cutouts: raise InvalidQueryError("Cutout contains no data! (Check image footprint.)") # Log total time elapsed log.debug("Total time: %.2f sec", monotonic() - start_time) return self.cutouts
def _make_cutout_filename(self, file: str, output_format: str) -> str: """ Generate a standardized filename for the cutout. Overrides the superclass method to include the '_lite' tag if applicable and the output format. Parameters ---------- file : str The input file name. output_format : str The output format to write the cutout to. Options are '.fits' and '.asdf'. Returns ------- filename : str The generated filename for the cutout. """ return "{}_{:.7f}_{:.7f}_{}-x-{}{}_astrocut{}".format( Path(file).stem, self._coordinates.ra.value, self._coordinates.dec.value, str(self._cutout_size[0]).replace(" ", ""), str(self._cutout_size[1]).replace(" ", ""), "_lite" if self._lite else "", output_format, ) def _write_as_format(self, output_format: str, output_dir: Union[str, Path] = ".") -> List[str]: """ Write the cutout to disk in the specified output format. Parameters ---------- output_format : str The output format to write the cutout to. Options are '.fits' and '.asdf'. output_dir : str | Path The output directory to write the cutouts to Returns ------- cutout_paths : list The path(s) to the cutout file(s) or the cutout memory objects. """ Path(output_dir).mkdir(parents=True, exist_ok=True) cutout_paths = [] # List to store paths to cutout files cutouts = self.fits_cutouts if output_format == ".fits" else self.asdf_cutouts for file, cutout in zip(self.cutouts_by_file.keys(), cutouts): # Determine the output path filename = self._make_cutout_filename(file, output_format) cutout_path = Path(output_dir, filename) # Write the cutout to disk or memory in the specified format if output_format == ".fits": with warnings.catch_warnings(): warnings.simplefilter("ignore") cutout.writeto(cutout_path, overwrite=True, checksum=True) elif output_format == ".asdf": cutout.write_to(cutout_path) cutout_paths.append(cutout_path.as_posix()) log.debug("Cutout filepaths: %s", cutout_paths) return cutout_paths
[docs] def write_as_fits(self, output_dir: Union[str, Path] = ".") -> List[str]: """ Write the cutouts to disk or memory in FITS format. Parameters ---------- output_dir : str | Path The output directory to write the cutouts to. Defaults to the current directory. Returns ------- list A list of paths to the cutout FITS files. """ return self._write_as_format(output_format=".fits", output_dir=output_dir)
[docs] def write_as_asdf(self, output_dir: Union[str, Path] = ".", validate_output: bool = True) -> List[str]: """ Write the cutouts to disk or memory in ASDF format. Parameters ---------- output_dir : str | Path The output directory to write the cutouts to. Defaults to the current directory. validate_output : bool Whether to validate the output ASDF file. Defaults to True. Setting to False can speed up writing for large numbers of cutouts, but should only be used if you trust the output is valid. Returns ------- list A list of paths to the cutout ASDF files. """ return self._write_as_format(output_format=".asdf", output_dir=output_dir)
[docs] def write_as_zip( self, output_dir: Union[str, Path] = ".", filename: Union[str, Path, None] = None, *, output_format: str = ".asdf", ) -> str: """ Package the ASDF or FITS cutouts into a zip archive without writing intermediates. Parameters ---------- output_dir : str | Path, optional Directory where the zip will be created. Default '.'. filename : str | Path | None, optional Name (or path) of the output zip file. If not provided, defaults to 'astrocut_{ra}_{dec}_{size}.zip'. If provided without a '.zip' suffix, the suffix is added automatically. output_format : str, optional Either '.asdf' (default) or '.fits'. Determines which in-memory representation is zipped. Returns ------- str Path to the created zip file. """ fmt = output_format.lower().strip() fmt = "." + fmt if not fmt.startswith(".") else fmt if fmt not in (".asdf", ".fits"): raise InvalidInputError("File format must be either '.asdf' or '.fits'") def build_entries(): use_fits = fmt == ".fits" objs = self.fits_cutouts if use_fits else self.asdf_cutouts for i, file in enumerate(self.cutouts_by_file): arcname = self._make_cutout_filename(file, fmt) yield arcname, objs[i] return self._write_cutouts_to_zip(output_dir=output_dir, filename=filename, build_entries=build_entries)
[docs] def get_center_pixel(gwcsobj: gwcs.wcs.WCS, ra: float, dec: float) -> Tuple[Tuple[int, int], WCS]: """ Get the closest pixel location on an input image for a given set of coordinates. Parameters ---------- gwcsobj : gwcs.wcs.WCS The GWCS object. ra : float The right ascension of the input coordinates. dec : float The declination of the input coordinates. Returns ------- pixel_position The pixel position of the input coordinates. wcs_updated : `~astropy.wcs.WCS` The approximated FITS WCS object. """ # Convert the gwcs object to an astropy FITS WCS header header = gwcsobj.to_fits_sip() # Update WCS header with some keywords that it's missing. # Otherwise, it won't work with astropy.wcs tools (TODO: Figure out why. What are these keywords for?) for k in ["cpdis1", "cpdis2", "det2im1", "det2im2", "sip"]: if k not in header: header[k] = "na" # New WCS object with updated header with warnings.catch_warnings(): warnings.simplefilter("ignore") wcs_updated = WCS(header) # Map the coordinates to a pixel's location on the 2d image row, col = gwcsobj.invert(np.atleast_1d(ra), np.atleast_1d(dec), with_bounding_box=False) row_pix = float(row.value[0]) if isinstance(row, Quantity) else float(row[0]) col_pix = float(col.value[0]) if isinstance(col, Quantity) else float(col[0]) pixel_coords = (row_pix, col_pix) return pixel_coords, wcs_updated
[docs] @deprecated_renamed_argument( "output_file", None, "1.0.0", warning_type=DeprecationWarning, message="`output_file` is non-operational and will be removed in a future version.", ) def asdf_cut( input_files: List[Union[str, Path, S3Path]], ra: float, dec: float, cutout_size: int = 25, output_file: Union[str, Path] = "example_roman_cutout.fits", write_file: bool = True, fill_value: Union[int, float] = np.nan, output_dir: Union[str, Path] = ".", output_format: str = ".asdf", key: str = None, secret: str = None, token: str = None, lite: bool = True, verbose: bool = False, ) -> Cutout2D: """ Takes one of more ASDF input files (`input_files`) and generates a cutout of designated size `cutout_size` around the given coordinates (`coordinates`). The cutout is written to a file or returned as an object. This function is maintained for backwards compatibility. For maximum flexibility, we recommend using the ``ASDFCutout`` class directly. Parameters ---------- input_file : str | Path | S3Path The input ASDF file. ra : float The right ascension of the central cutout. dec : float The declination of the central cutout. cutout_size : int Optional, default 25. The image cutout pixel size. Note: Odd values for `cutout_size` generally result in a cutout that is more accurately centered on the target coordinates compared to even values, due to the symmetry of the pixel grid. output_file : str | Path Optional, default "example_roman_cutout.fits". The name of the output cutout file. This parameter is deprecated and will be removed in a future version. write_file : bool Optional, default True. Flag to write the cutout to a file or not. fill_value: int | float Optional, default `np.nan`. The fill value for pixels outside the original image. output_dir : str | Path Optional, default ".". The directory to write the cutout file(s) to. output_format : str Optional, default ".asdf". The format of the output cutout file. If `write_file` is False, then cutouts will be returned as `asdf.AsdfFile` objects if `output_format` is ".asdf" or as `astropy.io.fits.HDUList` objects if `output_format` is ".fits". key : string Default None. Access key ID for S3 file system. Only applicable if `input_file` is a cloud resource. secret : string Default None. Secret access key for S3 file system. Only applicable if `input_file` is a cloud resource. token : string Default None. Security token for S3 file system. Only applicable if `input_file` is a cloud resource. lite : bool Optional, default True. If True, the cutout will be created in "lite" mode, which means that it will only contain the data and an updated world coordinate system. If False, cutouts will be made from all arrays in the input file (e.g., data, error, uncertainty, variance, etc.) where the last two dimensions match the shape of the science data array. It also preserves all of the metadata from the input file. verbose : bool Default False. If True, intermediate information is printed. Returns ------- response : str | list A list of cutout file paths if `write_file` is True, otherwise a list of cutout objects. """ asdf_cutout = ASDFCutout( input_files, f"{ra} {dec}", cutout_size, fill_value, key=key, secret=secret, token=token, lite=lite, verbose=verbose, ) if not write_file: # Returns as Cutout2D objects return asdf_cutout.cutouts # Get output format in standard form output_format = f".{output_format}" if not output_format.startswith(".") else output_format output_format = output_format.lower() if output_format == ".asdf": return asdf_cutout.write_as_asdf(output_dir) elif output_format == ".fits": return asdf_cutout.write_as_fits(output_dir) else: # Error if output format not recognized raise InvalidInputError( f'Output format {output_format} is not recognized. Valid options are ".asdf" and ".fits".' )