Source code for morpheus.classifier

# MIT License
# Copyright 2018 Ryan Hausen
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# ==============================================================================
"""An interface for interacting with Morpheus"""
import os
import time
import json
from subprocess import Popen
from typing import Iterable, List, Tuple, Callable, Dict, Union

import imageio
import numpy as np
import tensorflow.compat.v1 as tf
from import fits
from matplotlib.colors import hsv_to_rgb
from scipy import ndimage as ndi
from skimage.feature import peak_local_max
from skimage.filters import sobel
from skimage.measure import regionprops
from skimage.morphology import watershed
from tqdm import tqdm


import morpheus.core.helpers as helpers
import morpheus.core.model as model

[docs]class Classifier: """The primary interface for the use of Morpheus. Images can be classified by calling :py:meth:`~morpheus.classifier.Classifier.classify` and passing numpy arrays or string FITS file locations. After an image this this class offers some post processing functionality by generating segmentation maps using :py:meth:`~morpheus.classifier.Classifier.segmap_from_classified`, colorized morphological classifications using :py:meth:`~morpheus.classifier.Classifier.colorize_classification`, and generating catalogs using :py:meth:`~morpheus.classifier.Classifier.catalog_from_classified`. For more examples, see the `documentation <>`_. """ __graph = None __session = None __X = tf.placeholder(tf.float32, shape=[None, 40, 40, 4])
[docs] @staticmethod def classify( h: Union[np.ndarray, str] = None, j: Union[np.ndarray, str] = None, z: Union[np.ndarray, str] = None, v: Union[np.ndarray, str] = None, out_dir: str = None, batch_size: int = 1000, out_type: str = "rank_vote", gpus: List[int] = None, cpus: int = None, parallel_check_interval: float = 1, ) -> dict: """Generates per-pixel classifications from input images. Args: h (Union[np.ndarray, str]): The H band image or the path to it j (Union[np.ndarray, str]): The J band image or the path to it v (Union[np.ndarray, str]): The V band image or the path to it z (Union[np.ndarray, str]): The Z band image or the path to it out_dir (str): If provided, a directory to save the output to batch_size (int): The size of the batches to use when classifying the input out_type (str): The method by which to aggregate classifications for a single pixel. Can be one of "rank_vote", "mean_var", or "both" gpus (List[int]): The GPU ids to use for parallel classification the ids can be found using ``nvidia-smi`` cpus (int): The number of cpus to use for parallel classification. parallel_check_interval (float): If running a parallel job, how often to check on the running sub-processes in minutes. Returns: Dictionary containing the classification output for the given input Raises: ValueError if both gpus and cpus are given ValueError if mixed string and numpy arrays are given for h, j, v, z ValueError if h, j, v, or z are None """ Classifier._variables_not_none(["h", "j", "v", "z"], [h, j, v, z]) are_files = Classifier._valid_input_types_is_str(h, j, v, z) workers, is_gpu = Classifier._validate_parallel_params(gpus, cpus) if are_files: hduls, [h, j, v, z] = Classifier._parse_files(h, j, v, z) if out_dir is None: out_dir = "." else: hduls = [] if len(workers) == 1: classified = Classifier._classify_arrays( h=h, j=j, v=v, z=z, out_type=out_type, out_dir=out_dir, batch_size=batch_size, ) else: Classifier._build_parallel_classification_structure( [h, j, v, z], workers, batch_size, out_dir, out_type ) Classifier._run_parallel_jobs( workers, is_gpu, out_dir, parallel_check_interval ) Classifier._stitch_parallel_classifications(workers, out_dir, out_type) classification_hduls, classified = Classifier._retrieve_classifications( out_dir, out_type ) hduls.extend(classification_hduls) for hdul in hduls: hdul.close() return classified
[docs] @staticmethod def catalog_from_classified( classified: dict, flux: np.ndarray, segmap: np.ndarray, aggregation_scheme: Callable = None, out_file: str = None, ) -> List[Dict]: """Creates a catalog of sources and their morphologies. Args: classified (dict): A dictionary containing the output from morpheus. flux (np.ndarray): The corresponding flux image in H band segmap (np.ndarray): A labeled segmap where every pixel with a value > 0 is associated with a source. aggregation_scheme (func): Function that takes three arguments `classified`, `flux`, and `segmap`, same as this function, then returns a numpy array containing the morphological classification in the following order-spheroid, disk, irregular, and point source/compact. If None, then the flux weighting scheme in out_file (str): a location to save the catalog. Can either be .csv or .json. Anything else will raise a ValueError. Returns: A JSON-compatible list of dictionary objects with the following keys: { 'id': the id from the segmap 'location': a (y,x) location -- the max pixel within the segmap 'morphology': a dictionary containing the morphology values. } """ if out_file: if out_file.endswith((".csv", ".json")): is_csv = out_file.endswith(".csv") else: raise ValueError("out_file must end with .csv or .json") if aggregation_scheme is None: aggregation_scheme = Classifier.aggregation_scheme_flux_weighted catalog = [] for region in regionprops(segmap, flux): _id = region.label if _id < 1: continue img = region.intensity_image seg = region.filled_image start_y, start_x, end_y, end_x = region.bbox dat = {} for k in classified: dat[k] = classified[k][start_y:end_y, start_x:end_x].copy() classification = aggregation_scheme(dat, img, seg) masked_flux = img * seg # y, x = np.unravel_index(masked_flux.argmax(), masked_flux.shape) y, x = int(start_y + y), int(start_x + x) catalog.append( {"id": _id, "location": [y, x], "morphology": classification} ) if out_file: with open(out_file, "w") as f: if is_csv: f.write("source_id,y,x,sph,dsk,irr,ps\n") for c in catalog: csv = "{},{},{},{},{},{},{}\n" f.write( csv.format( c["id"], c["location"][0], c["location"][1], c["morphology"][0], c["morphology"][1], c["morphology"][2], c["morphology"][3], ) ) else: json.dump(catalog, f) return catalog
# TODO: make the output file with the FITS helper if the output dir is used.
[docs] @staticmethod def segmap_from_classified( classified: dict, flux: np.ndarray, bkg_src_threshold: float = 0.0, out_dir: str = None, min_distance: int = 20, mask: np.ndarray = None, deblend: bool = True, ) -> np.ndarray: """Generate a segmentation map from the classification output. For more information about the segmentation process, see: Args: data (dict): A dictionary containing the output from morpheus. flux (np.ndarray): The flux to use when making the segmap bkg_src_threshold (float): The max value that a background classification pixel can take and be considered a source. The default is 0. Should be between [0,1] out_dir (str): A path to save the segmap in. min_distance (int): The minimum distance for deblending mask (np.ndarry): A boolean mask indicating which pixels deblend (bool): If ``True``, perform deblending as described in 2. in the algorithm description. If ``False`` return segmap without deblending. Returns: A np.ndarray segmentation map """ if bkg_src_threshold < 0 or bkg_src_threshold >= 1: err_msg = [ "Invalid value for `bkg_src_threshold`, use a value in the ", "range [0, 1)", ] raise ValueError(err_msg) bkg = classified["background"] markers = np.zeros_like(flux, dtype=np.uint8) print("Building Markers...") if mask is None: mask = classified["n"] > 0 is_bkg = np.logical_and(bkg == 1, mask) is_src = np.logical_and(bkg <= bkg_src_threshold, mask) markers[is_bkg] = 1 markers[is_src] = 2 sobel_img = sobel(bkg) print("Watershedding...") segmented = watershed(sobel_img, markers, mask=mask) - 1 segmented[np.logical_not(mask)] = 0 labeled, _ = ndi.label(segmented) labeled[np.logical_not(mask)] = -1 if deblend: labeled = Classifier._deblend(labeled, flux, min_distance) if out_dir: fits.PrimaryHDU(data=labeled).writeto(os.path.join(out_dir, "segmap.fits")) return labeled
[docs] @staticmethod def colorize_classified( classified: dict, out_dir: str = None, hide_unclassified: bool = True ) -> np.ndarray: """Makes a color image from the classification output. The colorization scheme is defined in HSV and is as follows: * Spheroid = Red * Disk = Blue * Irregular = Green * Point Source = Yellow The hue is set to be the color associated with the highest ranked class for a given pixel. The saturation is set to be the difference between the highest ranked class and the second highest ranked class for a given pixel. For example, if the top two classes have nearly equal values given by the classifier, then the saturation will be low and the pixel will appear more white. If the top two classes have very different values, then the saturation will be high and the pixel's color will be vibrant and not white. The value for a pixel is set to be 1-bkg, where bkg is value given to the background class. If the background class has a high value, then the pixel will appear more black. If the background value is low, then the pixel will take on the color given by the hue and saturation values. Args: data (dict): A dictionary containing the output from Morpheus. out_dir (str): a path to save the image in. hide_unclassified (bool): If true, black out the edges of the image that are unclassified. If false, show the borders as white. Returns: A [width, height, 3] array representing the RGB image. """ red = 0.0 # spheroid blue = 0.7 # disk yellow = 0.18 # point source green = 0.3 # irregular shape = classified["n"].shape colors = np.array([red, blue, green, yellow]) morphs = np.dstack( [classified[i] for i in helpers.LabelHelper.MORPHOLOGIES[:-1]] ) ordered = np.argsort(-morphs, axis=-1) hues = np.zeros(shape) sats = np.zeros(shape) vals = 1 - classified["background"] # the classifier doesn't return values for this area so black it out if hide_unclassified: vals[0:5, :] = 0 vals[-5:, :] = 0 vals[:, 0:5] = 0 vals[:, -5:] = 0 for i in tqdm(range(shape[0])): for j in range(shape[1]): hues[i, j] = colors[ordered[i, j, 0]] sats[i, j] = ( morphs[i, j, ordered[i, j, 0]] - morphs[i, j, ordered[i, j, 1]] ) hsv = np.dstack([hues, sats, vals]) rgb = hsv_to_rgb(hsv) if out_dir: png = (rgb * 255).astype(np.uint8) imageio.imwrite(os.path.join(out_dir, "colorized.png"), png) return rgb
@staticmethod def _retrieve_classifications( out_dir: str, out_type: str ) -> Tuple[List[fits.HDUList], dict]: f_names = [] for morph in helpers.LabelHelper.MORPHOLOGIES: if out_type in ["mean_var", "both"]: f_names.extend( [ os.path.join(out_dir, f"{morph}_mean.fits"), os.path.join(out_dir, f"{morph}_var.fits"), ] ) if out_type in ["rank_vote", "both"]: f_names.append(os.path.join(out_dir, f"{morph}.fits")) f_names.append(os.path.join(out_dir, "n.fits")) hduls, arrs = helpers.FitsHelper.get_files(f_names) classified = { os.path.split(n)[1].replace(".fits", ""): a for n, a in zip(f_names, arrs) } return hduls, classified @staticmethod def _valid_input_types_is_str( h: Union[np.ndarray, str] = None, j: Union[np.ndarray, str] = None, z: Union[np.ndarray, str] = None, v: Union[np.ndarray, str] = None, ): in_types = {type(val) for val in [h, j, z, v]} if len(in_types) > 1: raise ValueError( "Mixed input type usuage. Ensure all are numpy arrays or strings." ) t = in_types.pop() if t in [np.ndarray, str]: return t == str else: raise ValueError("Input type must either be numpy array or string") # NEW API ================================================================== @staticmethod def _classify_arrays( h: np.ndarray = None, j: np.ndarray = None, z: np.ndarray = None, v: np.ndarray = None, out_dir: str = None, batch_size: int = 1000, out_type: str = "rank_vote", ) -> Dict: """Classify numpy arrays using Morpheus. Args: h (np.ndarray): the H band values for an image j (np.ndarray): the J band values for an image z (np.ndarray): the Z band values for an image v (np.ndarray): the V band values for an image out_dir (str): The location where to save the output files if None returns the output in memory only. batch_size (int): the number of image sections blackto process at a time out_type (str): how to process the output from Morpheus. If 'mean_var' record output using mean and variance, If 'rank_vote' record output as the normalized vote count. If 'both' record both outputs. Returns: A dictionary containing the output classifications. Raises: ValueError if out_type is not one of ['mean_var', 'rank_vote', 'both'] """ Classifier._variables_not_none(["h", "j", "z", "v"], [h, j, z, v]) Classifier._arrays_same_size([h, j, z, v]) if out_type not in ["mean_var", "rank_vote", "both"]: raise ValueError("Invalid value for `out_type`") mean_var = out_type in ["mean_var", "both"] rank_vote = out_type in ["rank_vote", "both"] shape = h.shape hduls = [] data = {} if out_dir: if mean_var: hs, ds = helpers.FitsHelper.create_mean_var_files(shape, out_dir) hduls.extend(hs) data.update(ds) if rank_vote: hs, ds = helpers.FitsHelper.create_rank_vote_files(shape, out_dir) hduls.extend(hs) data.update(ds) hs, ds = helpers.FitsHelper.create_n_file(shape, out_dir) hduls.extend(hs) data.update(ds) else: if mean_var: data.update(helpers.LabelHelper.make_mean_var_arrays(shape)) if rank_vote: data.update(helpers.LabelHelper.make_rank_vote_arrays(shape)) data.update(helpers.LabelHelper.make_n_array(shape)) indicies = helpers.LabelHelper.windowed_index_generator(*shape) window_y, window_x = helpers.LabelHelper.UPDATE_MASK_N.shape batch_estimate = shape[0] - window_y + 1 batch_estimate *= shape[1] - window_x + 1 batch_estimate = batch_estimate // batch_size pbar = tqdm(total=batch_estimate, desc="classifying", unit="batch") while True: batch = [] batch_idx = [] for _ in range(batch_size): try: y, x = next(indicies) except StopIteration: break combined = np.array( [img[y : y + window_y, x : x + window_x] for img in [h, j, v, z]] ) batch.append(Classifier._standardize_img(combined)) batch_idx.append((y, x)) if not batch: break batch = np.array(batch) labels = Classifier._call_morpheus(batch) helpers.LabelHelper.update_labels(data, labels, batch_idx, out_type) pbar.update() if rank_vote: helpers.LabelHelper.finalize_rank_vote(data) for hdul in hduls: hdul.close() return data @staticmethod def _standardize_img(img: np.ndarray) -> np.ndarray: """Standardizes an input img to mean 0 and unit variance. Uses the formula described in: Args: img (np.ndarray): the input array to standardize Returns: The standardized input """ num = img - img.mean() denom = max(img.std(), 1 / np.sqrt( return num / denom @staticmethod def _arrays_same_size(arrays: List[np.ndarray]) -> None: """Verifies that all arrays are the same shape. Args: arrays (List[np.ndarray]): List of arrays that should have the same shape. Returns: None Raises: ValueError if arrays are not the same shape """ arr_shapes = [a.shape for a in arrays] arr_comp = arr_shapes[0] arr_to_comp = arr_shapes[1:] if not np.array_equiv(arr_comp, arr_to_comp): raise ValueError(f"All shapes not the same: {arr_shapes}.") @staticmethod def _variables_not_none(names: List[str], values: List[np.ndarray]) -> None: """Verifies that all variables are not None. Args: names (List[str]): list of names of variables in the same order as `values` names (List[np.ndarray]): list of numpy arrays that should not be None Returns: None Raises: ValueError if a variable is None """ nones = [] for name, value in zip(names, values): if value is None: nones.append(name) if nones: raise ValueError("{} should not be None".format(nones)) @staticmethod def _parse_files( h: str, j: str, v: str, z: str ) -> Tuple[List[fits.HDUList], List[np.ndarray]]: """Validates that files exist. And returns the corresponding arrays. Args: h (str): the file location of the H band img j (str): the file location of the J band img v (str): the file location of the V band img z (str): the file location of the Z bnad img Returns: A tuple containing the a (List[HDUL], List[np.ndarray]) Raises: ValueError if a variable is None """ Classifier._variables_not_none(["h", "j", "z", "v"], [h, j, z, v]) return helpers.FitsHelper.get_files([h, j, v, z]) @staticmethod def _call_morpheus(batch: np.ndarray) -> np.ndarray: """Use morpheus to classify a batch of input values. Morpheus is called as a singleton using this method. Args: batch (np.ndarray): The input data in the shape [batch, channels, width, height] Returns: The classified numpy array with shape [batch, width, height, channels] """ batch = np.transpose(batch, axes=[0, 2, 3, 1]) if Classifier.__graph is None: config = model.Morpheus.inference_hparams() inference_dataset = model.Morpheus.mock_dataset() # build graph m = model.Morpheus(config, inference_dataset, "channels_last") Classifier.__graph = m.inference(Classifier.__X) # get weights saver = tf.train.Saver() Classifier.__session = tf.Session() w_location = model.Morpheus.get_weights_dir() saver.restore(Classifier.__session, tf.train.latest_checkpoint(w_location)) return Classifier.__graph, feed_dict={Classifier.__X: batch} ) @staticmethod def _get_split_length(shape: List[int], num_workers: int) -> int: """Calculate the size of the sub images for classification. Args: shape (List[int]): the shape of the array to be split num_workers (int): the number of splits to make Returns: The length of each split along axis 0 TODO: Implement splits along other axes """ return (shape[0] + (num_workers - 1) * 40) // num_workers @staticmethod def _get_split_slice_generator( shape: Tuple[int], num_workers: int, slice_length: int ) -> Iterable[slice]: """Creates a generator that yields `slice` objects to split imgs. Args: shape (Tuple[int]): The shape of the array to be split num_workers (int): The number of splits to make split_length (int): The length each slice should be Returns A generator that yields slice objects TODO: Implement splits along other axes """ idx = 0 for i in range(num_workers): start_idx = max(idx - 39, 0) if i == num_workers - 1: end_idx = shape[0] else: end_idx = start_idx + slice_length - 1 idx = end_idx yield slice(start_idx, end_idx) @staticmethod def _make_runnable_file( path: str, batch_size: int = 1000, out_type: str = "rank_vote" ) -> None: """Creates a file at `path` that classfies local FITS files. Args: path (str): The dir to save the file in batch_size (int): The batch size for Morpheus to use when classifying the input out_type (str): how to process the output from Morpheus. If 'mean_var' record output using mean and variance, If 'rank_vote' record output as the normalized vote count. If 'both' record both outputs. Returns: None """ local = os.path.dirname(os.path.dirname(__file__)) text = [ "import sys", f'sys.path.append("{local}")', "import os", "import numpy as np", "from tqdm import tqdm", "from morpheus.classifier import Classifier", "def main():", " data_dir = '.'", " output_dir = './output'", " if 'output' not in os.listdir():", " os.mkdir('./output')", " files = {", " 'h':os.path.join(data_dir, 'h.fits'),", " 'j':os.path.join(data_dir, 'j.fits'),", " 'v':os.path.join(data_dir, 'v.fits'),", " 'z':os.path.join(data_dir, 'z.fits')", " }", " Classifier.classify(h=files['h'],", " j=files['j'],", " v=files['v'],", " z=files['z'],", f" batch_size={batch_size},", f' out_type="{out_type}",', " out_dir=output_dir)", " sys.exit(0)", "if __name__=='__main__':", " main()", ] with open(os.path.join(path, ""), "w") as f: f.write("\n".join(text)) @staticmethod def _build_parallel_classification_structure( arrs: List[np.ndarray], workers: List[int], batch_size: int, out_dir: str, out_type: str, ) -> None: """Sets up the subdirs and files to run the parallel classification. Args: arrs (List[np.ndarray]): List of arrays to split up in the order HJVZ workers (List[int]): A list of worker ID's that can either be CUDA GPU ID's or a list dummy numbers for cpu workers batch_size (int): The batch size for Morpheus to use when classifying the input. out_dir (str): the location to place the subdirs in Returns: None """ shape = arrs[0].shape num_workers = len(workers) split_slices = Classifier._get_split_slice_generator( shape, num_workers, Classifier._get_split_length(shape, num_workers) ) for worker, split_slice in tqdm(zip(sorted(workers), split_slices)): sub_output_dir = os.path.join(out_dir, str(worker)) os.mkdir(sub_output_dir) for name, data in zip(["h", "j", "v", "z"], arrs): tmp_location = os.path.join(sub_output_dir, "{}.fits".format(name)) fits.PrimaryHDU(data=data[split_slice, :]).writeto(tmp_location) Classifier._make_runnable_file(sub_output_dir, batch_size, out_type) @staticmethod def _stitch_parallel_classifications( workers: List[int], out_dir: str, out_type: str ) -> None: """Stitch the seperate outputs made from the parallel classifications. Args: workers (List[int]): A list of worker ID's that can either be CUDA GPU ID's or a list dummy numbers for cpu workers out_dir (str): the location that contains the parallel classified subdirs out_type (str): how to process the output from Morpheus. If 'mean_var' record output using mean and variance, If 'rank_vote' record output as the normalized vote count. If 'both' record both outputs. Returns: None """ jobs = [] if out_type in ["mean_var", "both"]: jobs.append("mean_var") if out_type in ["rank_vote", "both"]: jobs.append("rank_vote") for morph in helpers.LabelHelper.MORPHOLOGIES: for job in jobs: if job == "mean_var": to_be_stitched = [] for worker_id in workers: # each worker was assinged a dir by id dir_list = [out_dir, str(worker_id), "output"] f_mean = os.path.join(*(dir_list + [f"{morph}_mean.fits"])) f_var = os.path.join(*(dir_list + [f"{morph}_var.fits"])) f_n = os.path.join(*(dir_list + ["n.fits"])) to_be_stitched.append( ( fits.getdata(f_mean), fits.getdata(f_var), fits.getdata(f_n), ) ) new_y = sum(t[0].shape[0] for t in to_be_stitched) new_y -= 39 * (len(to_be_stitched) - 1) new_x = to_be_stitched[0][0].shape[1] combined_mean = np.zeros(shape=[new_y, new_x], dtype=np.float32) combined_var = np.zeros(shape=[new_y, new_x], dtype=np.float32) combined_n = np.zeros(shape=[new_y, new_x], dtype=np.float32) start_y = 0 for new_mean, new_var, new_n in to_be_stitched: Classifier._merge_parallel_means_vars( combined_mean, combined_var, combined_n, new_mean, new_var, new_n, start_y, ) start_y += new_n.shape[0] - 39 to_write = [ (combined_mean, f"{morph}_mean.fits"), (combined_var, f"{morph}_var.fits"), (combined_n, "n.fits"), ] for f, n in to_write: fits.PrimaryHDU(data=f).writeto( os.path.join(out_dir, n), overwrite=True ) if job == "rank_vote": to_be_stitched = [] for worker_id in workers: # each worker was assinged a dir by id dir_list = [out_dir, str(worker_id), "output"] f_votes = os.path.join(*(dir_list + [f"{morph}.fits"])) f_n = os.path.join(*(dir_list + ["n.fits"])) to_be_stitched.append( (fits.getdata(f_votes), fits.getdata(f_n)) ) new_y = sum(t[0].shape[0] for t in to_be_stitched) new_y -= 39 * (len(to_be_stitched) - 1) new_x = to_be_stitched[0][0].shape[1] combined_votes = np.zeros(shape=[new_y, new_x], dtype=np.float32) combined_n = np.zeros(shape=[new_y, new_x], dtype=np.float32) start_y = 0 for new_votes, new_n in to_be_stitched: Classifier._merge_parallel_rank_votes( combined_votes, combined_n, new_votes, new_n, start_y ) start_y += new_n.shape[0] - 39 to_write = [ (combined_votes, f"{morph}.fits"), (combined_n, "n.fits"), ] for f, n in to_write: fits.PrimaryHDU(data=f).writeto( os.path.join(out_dir, n), overwrite=True ) @staticmethod def _merge_parallel_means_vars( total_mean: np.ndarray, total_var: np.ndarray, total_n: np.ndarray, new_mean: np.ndarray, new_var: np.ndarray, new_n: np.ndarray, y_idx: int, ) -> None: """Merge merge means/vars from a new piece to total. Derived from: Args: total (np.ndarray): The array of means to add ``new`` to total_n (np.ndarray): The array of counts to add ``new_n`` to new (np.ndarray): the new means to add to ``total`` new_n (np.ndarray): the new counts to add to ``total`` y_idx (int): index for placement of ``new`` into ``total`` along y axis Returns: None """ ys = slice(y_idx, y_idx + new_mean.shape[0]) x1, x2 = total_mean[ys, :].copy(), new_mean.copy() s1, s2 = total_var[ys, :].copy(), new_var.copy() n1, n2 = total_n[ys, :].copy(), new_n.copy() denominator = n1 + n2 xc_numerator = n1 * x1 + n2 * x2 xc = np.where(denominator > 0, xc_numerator / denominator, 0) sc_numerator = (n1 * (s1 + np.square(x1 - xc))) + ( n2 * (s2 + np.square(x2 - xc)) ) sc = np.where(denominator > 0, sc_numerator / denominator, 0) total_mean[ys, :] = xc total_var[ys, :] = sc total_n[ys, :] = denominator @staticmethod def _merge_parallel_rank_votes( total_votes: np.ndarray, total_n: np.ndarray, new_votes: np.ndarray, new_n: np.ndarray, y_idx: int, ) -> None: """Merge vote counts from a new piece to total Args: total_count (np.ndarray): The array of votes to add ``new`` to total_n (np.ndarray): The array of counts to add ``new_n`` to new_votes (np.ndarray): The array of votes to add to ``total`` new_n (np.ndarray): The array of counts to add to ``new`` y_idx (int): index for placement pf ``new`` into ``total`` along y axis Returns: None """ ys = slice(y_idx, y_idx + new_votes.shape[0]) x1, x2 = total_votes[ys, :].copy(), new_votes.copy() n1, n2 = total_n[ys, :].copy(), new_n.copy() numerator = (n1 * x1) + (n2 * x2) denominator = n1 + n2 mean = np.where(denominator > 0, numerator / denominator, 0) total_votes[ys, :] = mean total_n[ys, :] = denominator # TODO: Add an informative output. @staticmethod def _run_parallel_jobs( workers: List[int], is_gpu: bool, out_dir: str, parallel_check_interval: float ) -> None: """Starts and tracks parallel job runs. WARNING: This will not finish running until all subprocesses are complete Args: workers (List[int]): A list of worker ID's to assign to a portion of an image. is_gpu (bool): if True the worker ID's belong to NVIDIA GPUs and will be used as an argument in CUDA_VISIBLE_DEVICES. If False, then the ID's are assocaited with CPU workers out_dir (str): the location with the partitioned data parallel_check_interval (float): If gpus are given, then this is the number of minutes to wait between polling each subprocess for completetion Returns: None """ processes = {} for worker in workers: if is_gpu: cmd_string = f"CUDA_VISIBLE_DEVICES={worker} python" else: cmd_string = f"CUDA_VISIBLE_DEVICES=-1 python" sub_dir = os.path.join(out_dir, str(worker)) processes[worker] = Popen(cmd_string, shell=True, cwd=sub_dir) is_running = np.ones([len(workers)], dtype=np.bool) while is_running.any(): for i, g in enumerate(sorted(workers)): if is_running[i] and (processes[g].poll() is not None): is_running[i] = False if is_running.any(): time.sleep(parallel_check_interval * 60) else: # we're done we can skip sleep break @staticmethod def _validate_parallel_params( gpus: List[int] = None, cpus: int = None ) -> Tuple[List[int], bool]: """Validates that the parallelism scheme. Only one of the arguments should be given. Args: gpus (List[int]): A list of the CUDA gpu ID's to use for a parallel classification. cpus (int): Number of cpus to use foa a parallel classification Returns: A tuple containing the list of worker ids and a boolean indicating wheter or not the ids belong to GPUS Raises: ValueError if both cpus and gpus are not None """ # invalid params if (gpus is not None) and (cpus is not None): raise ValueError("Please only give a value cpus or gpus, not both.") # Simple serial run if (gpus is None) and (cpus is None): return [0], False if gpus is not None: if len(gpus) == 1: err = "Only one gpus indicated. If you are trying to select " err += "a single gpu, then use the CUDA_VISIBLE_DEVICES environment " err += "variable. For more information visit: " err += "" raise ValueError(err) else: return gpus, True else: if cpus < 2: raise ValueError( "If passing cpus please indicate a value greater than 1." ) return np.arange(cpus), False @staticmethod def _deblend(segmap: np.ndarray, flux: np.ndarray, min_distance: int) -> np.ndarray: """Deblends a segmentation map according to the description in make_segmap. Args: segmap (np.ndarray): The segmentation map image to deblend flux (np.ndarray): The corresponding flux image in H band min_distance (int): The radius of the PSF for the instrument used on H band Returns: A np.ndarray representing the deblended segmap """ max_id = segmap.max() for region in tqdm(regionprops(segmap, flux), desc="Deblending"): # greater than 1 indicates that the region is not background if region.label > 0: flx = region.intensity_image seg = region.filled_image flux_map = flx * seg maxes = peak_local_max( flux_map, min_distance=min_distance, num_peaks=20 ) # more than 1 source found, deblend if maxes.shape[0] > 1: start_y, start_x, end_y, end_x = region.bbox markers = np.zeros_like(seg, for y, x in maxes: max_id += 1 markers[y, x] = max_id deblended = watershed(-flux_map, markers, mask=seg) local_segmap = segmap[start_y:end_y, start_x:end_x].copy() local_segmap = np.where(seg, deblended, local_segmap) segmap[start_y:end_y, start_x:end_x] = local_segmap return segmap
[docs] @staticmethod def aggregation_scheme_flux_weighted( data: dict, flux: np.ndarray, segmap: np.ndarray ) -> List[float]: """Aggregates pixel level morphological classifications to the source level. Uses a flux-weighted mean of the pixel level morphologies to calculate the aggregate source level morphology. Args: data (dict): A dictionary containing the output from morpheus. flux (np.ndarray): The corresponding flux image in H band segmap (int): The binary map indicating pixels that belong to the source Returns: The morphological classification as a list of floats in the following order: ['spheroid', 'disk', 'irregular', 'point source'] """ classifications = np.zeros([4]) morphs = ["spheroid", "disk", "irregular", "point_source"] morphs = [data[m] for m in morphs] for i, m in enumerate(morphs): classifications[i] = np.mean(m[segmap] * flux[segmap]) return (classifications / classifications.sum()).tolist()