Source code for sacc.sacc

import copy
import warnings
import os
from io import BytesIO

import numpy as np
from astropy.io import fits
from astropy.table import Table

from .tracers import BaseTracer
from .windows import BaseWindow, BandpowerWindow
from .covariance import BaseCovariance, concatenate_covariances
from .utils import unique_list
from .data_types import standard_types, DataPoint


[docs]class Sacc: """ A class containing a selection of LSST summary statistic measurements, their covariance, and the metadata necessary to compute theoretical predictions for them. """ def __init__(self): """ Create an empty data set ready to be built up """ self.data = [] self.tracers = {} self.covariance = None self.metadata = {} def __len__(self): """ Return the number of data points in the data set. Returns ------- n: int The number of data points """ return len(self.data)
[docs] def copy(self): """ Create a copy of the data set with no data shared with the original. You can safely modify the copy without it affecting the original. Returns ------- S: Sacc instance A new instance of the data set. """ return copy.deepcopy(self)
[docs] def to_canonical_order(self): """ Re-order the data set in-place to a standard ordering. """ # Define the ordering to be used # We need a key function that will return the # object that python's default sorted function will use. def order_key(row): # Put data types in the order in allowed_types. # If not present then just use the hash of the data type. if row.data_type in standard_types: dt = standard_types.index(row.data_type) else: dt = hash(row.data_type) # If known, order by ell or theta. # Otherwise just use whatever we have. if 'ell' in row.tags: return (dt, row.tracers, row.tags['ell']) elif 'theta' in row.tags: return (dt, row.tracers, row.tags['theta']) else: return (dt, row.tracers, 0.0) # This from # https://stackoverflow.com/questions/6422700/how-to-get-indices-of-a-sorted-array-in-python indices = [i[0] for i in sorted(enumerate(self.data), key=lambda x:order_key(x[1]))] # Assign the new order. self.reorder(indices)
[docs] def reorder(self, indices): """ Re-order the data set in-place according to the indices passed in. If not all indices are included in the input then the data set will be cut down. Parameters ---------- indices: integer list or array Indices for the re-ordered data """ self.data = [self.data[i] for i in indices] if self.has_covariance(): self.covariance = self.covariance.keeping_indices(indices)
# # Builder methods for building up Sacc data from scratch in memory #
[docs] def add_tracer(self, tracer_type, name, *args, **kwargs): """ Add a new tracer Parameters ---------- tracer_type: str A string corresponding to one of the known tracer types, or 'misc' to use a new tracer with no parameters. e.g. "NZ" for n(z) tracers name: str A name for the tracer *args: Additional arguments to pass to the tracer constructor. These depend on the type of the tracer. For n(z) tracers these should be z and nz arrays **kwargs: Additional keyword arguments to pass to the tracer constructor. These depend on the type of the tracer. There are no kwargs for n(z) tracers Returns ------- None """ tracer = BaseTracer.make(tracer_type, name, *args, **kwargs) self.add_tracer_object(tracer)
[docs] def add_tracer_object(self, tracer): """ Add a pre-constructed BaseTracer instance to this data set. If you just have, for example the z and n(z) data then use the add_tracer method instead. Parameters ---------- tracer: Tracer instance The tracer object to add to the data set """ self.tracers[tracer.name] = tracer
[docs] def add_data_point(self, data_type, tracers, value, tracers_later=False, **tags): """ Add a data point to the set. Parameters ---------- data_type: str tracers: tuple of str Strings corresponding to zero or more of the tracers in the data set. These should either be already set up using the add_tracer method, or you could set tracers_later=True if you want to add them later. e.g. for 2pt measurements the tracers are the names of the two n(z) samples. value: float A single value for the data point tracers_later: bool If True, do not complain if the tracers are not know already. **tags: Tags to apply to this data point. Tags can be any arbitrary metadata that you might want later, For 2pt data the tag would include an angle theta or ell. Returns ------- None """ if self.has_covariance(): raise ValueError("You cannot add a data point after " "setting the covariance") tracers = tuple(tracers) for tracer in tracers: if (tracer not in self.tracers) and (not tracers_later): raise ValueError(f"Tracer named '{tracer}' is not in the " "known list of tracers. " "Either put it in before adding data " "points or set tracers_later=True") d = DataPoint(data_type, tracers, value, **tags) self.data.append(d)
[docs] def add_covariance(self, covariance, overwrite=False): """ Once you have finished adding data points, add a covariance for the entire set. Parameters ---------- covariance: array or list 2x2 numpy array containing the covariance of the added data points OR a list of blocks overwrite: bool If True, it overwrites the stored covariance matrix with the given one. Returns ------- None """ if self.has_covariance() and not overwrite: raise RuntimeError("This sacc file already contains a covariance" "matrix. Use overwrite=True if you want to " "replace it for the new one") if isinstance(covariance, BaseCovariance): cov = covariance else: cov = BaseCovariance.make(covariance) expected_size = len(self) if not cov.size == expected_size: raise ValueError("Covariance has the wrong size. " f"Should be {expected_size} but is {cov.size}") self.covariance = cov
[docs] def has_covariance(self): """ Return whether or not this data set has a covariance attached to it Returns ------- bool Whether or not a covariance has been added to this data """ return self.covariance is not None
def _indices_to_bool(self, indices): # Convert an array of indices into a boolean True mask if indices.dtype not in [np.int8, np.int16, np.int32, np.int64]: raise ValueError(f"Wrong indices type ({indices.dtype}) - " "expected integers or boolean") m = np.zeros(len(self), dtype=bool) for i in indices: m[i] = True return m
[docs] def keep_indices(self, indices): """ Select data points, keeping only values where the mask is True or an index is included in it. You can use Sacc.remove_indices to do the opposite operation, keeping points where the mask is False. You use the Sacc.keep_selection method to find indices and apply this method automatically, or the Sacc.indices method to manually select indices. Parameters ---------- indices: array or list Mask must be either a boolean array or a list/array of integer indices to remove. If boolean then True means to keep a data point and False means to cut it if integers then values indicate data points to keep. """ indices = np.array(indices) # Convert integer masks to booleans if indices.dtype != bool: indices = self._indices_to_bool(indices) self.data = [d for i, d in enumerate(self.data) if indices[i]] if self.has_covariance(): self.covariance = self.covariance.keeping_indices(indices)
[docs] def remove_indices(self, indices): """ Remove data points, getting rid of points where the mask is True or an index is included in it. You can use Sacc.keep_indices to do the opposite operation, keeping points where the mask is True. You use the Sacc.remove_selection method to find indices and apply this method automatically, or the Sacc.indices method to manually select indices. Parameters ---------- indices: array or list Mask must be either a boolean array or a list/array of integer indices to remove. If boolean then True means to cut data point and False means to keep it if integers then values indicate data points to cut out """ indices = np.array(indices) # Convert integer masks to booleans if indices.dtype != bool: indices = self._indices_to_bool(indices) # Get the mask method to do the actual work self.keep_indices(~indices)
[docs] def indices(self, data_type=None, tracers=None, warn_empty=True, **select): """ Find the indices of all points matching the given selection criteria. Parameters ---------- data_type: str Select only data points which are of this data type. If None (the default) then match any data types tracers: tuple Select only data points which match this tracer combination. If None (the default) then match any tracer combinations. **select: Select only data points with tag names and values matching all values provided in this kwargs option. You can also use the syntax name__lt=value or name__gt=value in the selection to select points less or greater than a threshold Returns indices: array Array of integer indices of matching data points """ indices = [] if tracers is not None: tracers = tuple(tracers) # Look through all data points we have for i, d in enumerate(self.data): # Skip things with the wrong type or tracer if not ((tracers is None) or (d.tracers == tracers)): continue if not ((data_type is None or d.data_type == data_type)): continue # Remove any objects that don't match the required tags, # including the fact that we can specify tag__lt and tag__gt # in order to remove/accept ranges ok = True for name, val in select.items(): if name.endswith("__lt"): name = name[:-4] if not d.get_tag(name) < val: ok = False break elif name.endswith("__gt"): name = name[:-4] if not d.get_tag(name) > val: ok = False break else: if not d.get_tag(name) == val: ok = False break # Record this index if ok: indices.append(i) if len(indices) == 0 and warn_empty: if tracers is None: warnings.warn("Empty index selected") else: warnings.warn("Empty index selected - maybe you " "should check the tracer order?") return np.array(indices, dtype=int)
[docs] def remove_selection(self, data_type=None, tracers=None, warn_empty=True, **select): """ Remove data points, getting rid of points matching the given criteria. You can use Sacc.keep_selection to do the opposite operation, keeping points where the criteria are matched. You can manually remove points using the Sacc.indices and Sacc.remove_indices methods. Parameters ---------- data_type: str Select only data points which are of this data type. If None (the default) then match any data types tracers: tuple Select only data points which match this tracer combination. If None (the default) then match any tracer combinations. **select: Select only data points with tag names and values matching all values provided in this kwargs option. You can also use the syntax name__lt=value or name__gt=value in the selection to select points less or greater than a threshold """ indices = self.indices(data_type=data_type, tracers=tracers, warn_empty=warn_empty, **select) self.remove_indices(indices)
[docs] def keep_selection(self, data_type=None, tracers=None, warn_empty=True, **select): """ Remove data points, keeping only points matching the given criteria. You can use Sacc.remove_selection to do the opposite operation, keeping points where the criteria are not matched. You can manually remove points using the Sacc.indices and Sacc.keep_indices methods. Parameters ---------- data_type: str Select only data points which are of this data type. If None (the default) then match any data types tracers: tuple Select only data points which match this tracer combination. If None (the default) then match any tracer combinations. **select: Select only data points with tag names and values matching all values provided in this kwargs option. You can also use the syntax name__lt=value or name__gt=value in the selection to select points less or greater than a threshold """ indices = self.indices(data_type=data_type, tracers=tracers, warn_empty=warn_empty, **select) self.keep_indices(indices)
def _get_tags_by_index(self, tags, indices): """ Get the value of a one or more named tags for (a subset of) the data. Parameters ---------- tags: list of str Tags to look up on the selected data indices: list or array Indices of data points Returns ------- values: list of lists For each input tag, a corresponding list of the value of that tag for given selection, in the order the matching data points were added. """ indices = set(indices) values = [[d.get_tag(tag) for i, d in enumerate(self.data) if i in indices] for tag in tags] return values
[docs] def get_tags(self, tags, data_type=None, tracers=None, **select): """ Get the value of a one or more named tags for (a subset of) the data. Parameters ---------- tags: list of str Tags to look up on the selected data data_type: str Select only data points which are of this data type. If None (the default) then match any data types tracers: tuple Select only data points which match this tracer combination. If None (the default) then match any tracer combinations. **select: Select only data points with tag names and values matching all values provided in this kwargs option. You can also use the syntax name__lt=value or name__gt=value in the selection to select points less or greater than a threshold Returns ------- values: list of lists For each input tag, a corresponding list of the value of that tag for given selection, in the order the matching data points were added. """ indices = self.indices(data_type=data_type, tracers=tracers, **select) return self._get_tags_by_index(tags, indices)
[docs] def get_tag(self, tag, data_type=None, tracers=None, **select): """ Get the value of a one tag for (a subset of) the data. Parameters ---------- tag: str Tag to look up on the selected data data_type: str Select only data points which are of this data type. If None (the default) then match any data types tracers: tuple Select only data points which match this tracer combination. If None (the default) then match any tracer combinations. **select: Select only data points with tag names and values matching all values provided in this kwargs option. You can also use the syntax name__lt=value or name__gt=value in the selection to select points less or greater than a threshold Returns ------- values: list A list of the value of the tag for given selection, in the order the matching data points were added. """ return self.get_tags([tag], data_type=data_type, tracers=tracers, **select)[0]
[docs] def get_data_points(self, data_type=None, tracers=None, **select): """ Get data point objects for a subset of the data Parameters ---------- data_type: str Select only data points which are of this data type. If None (the default) then match any data types tracers: tuple Select only data points which match this tracer combination. If None (the default) then match any tracer combinations. **select: Select only data points with tag names and values matching all values provided in this kwargs option. You can also use the syntax name__lt=value or name__gt=value in the selection to select points less or greater than a threshold Returns ------- values: list A list of the data point objects for the selection, in the order they were added. """ indices = self.indices(data_type=data_type, tracers=tracers, **select) return [self.data[i] for i in indices]
[docs] def get_mean(self, data_type=None, tracers=None, **select): """ Get mean values for each data point matching the criteria. Parameters ---------- data_type: str Select only data points which are of this data type. If None (the default) then match any data types tracers: tuple Select only data points which match this tracer combination. If None (the default) then match any tracer combinations. **select: Select only data points with tag names and values matching all values provided in this kwargs option. You can also use the syntax name__lt=value or name__gt=value in the selection to select points less or greater than a threshold Returns ------- values: list The mean values for each matching data point, in the order they were added. """ indices = self.indices(data_type=data_type, tracers=tracers, **select) return self.mean[indices]
[docs] def get_data_types(self, tracers=None): """ Get a list of the different data types stored in the Sacc Parameters ---------- tracers: tuple Select only data types which match this tracer combination. If None (the default) then match any tracer combinations. Returns -------- data_types: list of strings A list of the string data types in the data set """ data_types = unique_list(d.data_type for d in self.data if ((tracers is None) or (d.tracers == tracers))) return data_types
[docs] def get_tracer(self, name): """ Get the tracer object with the given name Parameters ----------- name: str A string name of a tracer Returns ------- tracer: BaseTracer object The object corresponding to the name. """ return self.tracers[name]
[docs] def get_tracer_combinations(self, data_type=None): """ Find all the tracer combinations (e.g. tomographic bin pairs) for the given data type Parameters ----------- data_type: str A string name of the data type to find Returns ------- combinations: list of tuples of strings A list of all the tracer combinations found in any data point. No specific ordering. """ indices = self.indices(data_type=data_type) return unique_list(self.data[i].tracers for i in indices)
[docs] def remove_tracers(self, names): """ Remove the tracer objects and their associated data points Parameters ----------- names: list A list of string names of the tracers to be removed """ for trs in self.get_tracer_combinations(): for tri in trs: if tri in names: self.remove_selection(tracers=trs) break for name in names: del self.tracers[name]
[docs] def keep_tracers(self, names): """ Keep only the tracer objects and their associated data points. Parameters ----------- names: list A list of string names of the tracers to be kept """ for trs in self.get_tracer_combinations(): for tri in trs: if tri not in names: self.remove_selection(tracers=trs) trs_names = list(self.tracers.keys()) for name in trs_names: if name not in names: del self.tracers[name]
[docs] def rename_tracer(self, name, new_name): """ Get the tracer object with the given name Parameters ----------- name: str A string name of a tracer to be changed the name new_name: str A string with the new name of the tracer """ tr = self.tracers.pop(name) tr.name = new_name self.tracers[new_name] = tr for d in self.data: new_trs = [] for tri in d.tracers: if tri == name: tri = new_name new_trs.append(tri) d.tracers = tuple(new_trs)
@property def mean(self): """ Get the vector of mean values for the entire data set. Returns ------- mean: array numpy array with all the mean values in the data set """ return np.array([d.value for d in self.data]) @mean.setter def mean(self, mu): """ Set the vector of mean values for the entire data set. Parameters ----------- mu: array Replace the mean values of all the data points. """ if not len(mu) == len(self.data): raise ValueError("Tried to set mean with thing of length {}" " but data is length {}".format(len(mu), len(self.data))) for m, d in zip(mu, self.data): d.value = m def _make_window_tables(self): # Convert any window objects in the data set to tables, # and record a mapping from those objects to table references # This could easily be extended to other types all_windows = unique_list(d.get_tag('window') for d in self.data) window_ids = {w: id(w) for w in all_windows} tables = BaseWindow.to_tables(all_windows) return tables, window_ids
[docs] def save_fits(self, filename, overwrite=False): """ Save this data set to a FITS format Sacc file. Parameters ---------- filename: str Destination FITS file name overwrite: bool If False (the default), raise an error if the file already exists If True, overwrite the file silently. """ # Since we don't want to re-order the file as a side effect # we first make a copy of ourself and re-order that. S = self.copy() S.to_canonical_order() # Tables for the windows tables, window_ids = S._make_window_tables() lookup = {'window': window_ids} # Tables for the tracers tables += BaseTracer.to_tables(S.tracers.values()) # Tables for the data sets for dt in S.get_data_types(): data = S.get_data_points(dt) table = DataPoint.to_table(data, lookup) # Could move this inside to_table? table.meta['SACCTYPE'] = 'data' table.meta['SACCNAME'] = dt table.meta['EXTNAME'] = f'data:{dt}' tables.append(table) # Create the actual fits object hdr = fits.Header() # save any global metadata in the header. # We save the keys and values as separate header cards, # because otherwise the keys are all forced to upper case hdr['NMETA'] = len(S.metadata) for i, (k, v) in enumerate(S.metadata.items()): hdr[f'KEY{i}'] = k hdr[f'VAL{i}'] = v hdus = [fits.PrimaryHDU(header=hdr)] + \ [fits.table_to_hdu(table) for table in tables] # Covariance, if needed. # All the other data elements become astropy tables first, # But covariances are a bit more complicated and dense, so we # allow them to convert straight to if S.covariance is not None: hdus.append(S.covariance.to_hdu()) # Make and save the final FITS data hdu_list = fits.HDUList(hdus) # The astropy writeto shows very poor performance # when writing lots of small metadata strings on # the NERSC Lustre file system. So we write to # a buffer first and then save that. # First we have to manually check for overwritten files # We raise the same error as astropy if os.path.exists(filename) and not overwrite: raise OSError(f"File {filename} already exists and overwrite=False") # Create the buffer and write the data to it buf = BytesIO() hdu_list.writeto(buf) # Rewind and read the binary data we just wrote buf.seek(0) output_data = buf.read() # Write the binary data to the target file with open(filename, "wb") as f: f.write(output_data)
[docs] @classmethod def load_fits(cls, filename): """ Load a Sacc data set from a FITS file. Don't try to make these FITS files yourself - use the tools provided in this package to make and save them. Parameters ---------- filename: str A FITS format sacc file """ hdu_list = fits.open(filename, "readonly") # Split the HDU's into the different sacc types tracer_tables = [Table.read(hdu) for hdu in hdu_list if hdu.header.get('SACCTYPE') == 'tracer'] window_tables = [Table.read(hdu) for hdu in hdu_list if hdu.header.get('SACCTYPE') == 'window'] data_tables = [Table.read(hdu) for hdu in hdu_list if hdu.header.get('SACCTYPE') == 'data'] cov = [hdu for hdu in hdu_list if hdu.header.get('SACCTYPE') == 'cov'] # Pull out the classes for these components. tracers = BaseTracer.from_tables(tracer_tables) windows = BaseWindow.from_tables(window_tables) # The lookup table is used to convert from ID numbers to # Window objects. lookup = {'window': windows} # Collect together all the data points from the different sections data = [] for table in data_tables: data += DataPoint.from_table(table, lookup) # Finally, take all the pieces that we have collected # and add them all into this data set. S = cls() for tracer in tracers.values(): S.add_tracer_object(tracer) # Add the data points manually instead of using the API, since we # have already constructed them. for d in data: S.data.append(d) # Assume there is only a single covariance extension, # if there are any if cov: S.add_covariance(BaseCovariance.from_hdu(cov[0])) # Load metadata from the primary heaer header = hdu_list[0].header # Load each key,value pair in turn. # This will work for normal scalar data types; # arrays etc. will need some thought. n_meta = header['NMETA'] for i in range(n_meta): k = header[f'KEY{i}'] v = header[f'VAL{i}'] S.metadata[k] = v hdu_list.close() return S
# # Methods below here are helper functions for specific types of data. # We can add more of them as it becomes clear what people need. # # def _get_2pt(self, data_type, tracer1, tracer2, return_cov, angle_name, return_ind=False): # Internal helper method for get_ell_cl and get_theta_xi ind = self.indices(data_type, (tracer1, tracer2)) mu = np.array(self.mean[ind]) angle = np.array(self._get_tags_by_index([angle_name], ind)[0]) if return_cov: if not self.has_covariance(): raise ValueError("This sacc data does not have " "a covariance attached") cov_block = self.covariance.get_block(ind) if return_ind: return angle, mu, cov_block, ind else: return angle, mu, cov_block else: if return_ind: return angle, mu, ind else: return angle, mu
[docs] def get_bandpower_windows(self, indices): """ Returns bandpower window functoins for a given set of datapoints. All datapoints must share the same bandpower window. Parameters ---------- indices: array indices of the data points you want windows for Returns ------- windows: BandpowerWindow object containing the bandpower window functions for these indices. """ ws = unique_list(self.data[i].tags.get('window') for i in indices) if len(ws) != 1: raise ValueError("You have asked for window functions, " "however, the points you have selected " "have different windows associated to them." "Please narrow down your selection (specify " "tracers and data type) or get windows " "later.") ws = ws[0] if not isinstance(ws, BandpowerWindow): warnings.warn("No bandpower windows associated to these data") return None else: w_inds = np.array(self._get_tags_by_index(['window_ind'], indices)[0]) return ws.get_section(w_inds)
[docs] def get_ell_cl(self, data_type, tracer1, tracer2, return_cov=False, return_ind=False): """ Helper method to extract the ell and C_ell values for a specific data type (e.g. 'shear_ee' and pair of tomographic bins) Parameters ---------- data_type: str Which C_ell type to extract tracer1: str The name of the first tracer, for example a tomographic bin name tracer2: str The name of the second tracer return_cov: bool If True, also return the block of the covariance corresponding to these points. Default=False return_ind: bool If True, also return the datapoint indices. Default=False Returns ------- ell: array Ell values for this tracer pair mu: array Mean values for this tracer pair cov_block: 2D array (Only if return_cov=True) The block of the covariance for these points indices: array (Only if return_ind=True) datapoint indices. """ return self._get_2pt(data_type, tracer1, tracer2, return_cov, 'ell', return_ind)
[docs] def get_theta_xi(self, data_type, tracer1, tracer2, return_cov=False, return_ind=False): """ Helper method to extract the theta and correlation function values for a specific data type (e.g. 'shear_xi' and pair of tomographic bins). Parameters ---------- data_type: str Which type of xi to extract tracer1: str The name of the first tracer, for example a tomographic bin name tracer2: str The name of the second tracer return_cov: bool If True, also return the block of the covariance corresponding to these points. Default=False return_ind: bool If True, also return the datapoint indices. Default=False Returns ------- ell: array Ell values for this tracer pair mu: array Mean values for this tracer pair cov_block: 2D array (Only if return_cov=True) The block of the covariance for these points indices: array (Only if return_ind=True) datapoint indices. """ return self._get_2pt(data_type, tracer1, tracer2, return_cov, 'theta', return_ind)
def _add_2pt(self, data_type, tracer1, tracer2, x, tag_val, tag_name, window, tracers_later, tag_extra=None, tag_extra_name=None): """ Internal method for adding 2pt data points. Copes with multiple values for the parameters """ # single data point case if np.isscalar(tag_val): t = {tag_name: float(tag_val)} if tag_extra_name is not None: t[tag_extra_name] = tag_extra if window is not None: t['window'] = window self.add_data_point(data_type, (tracer1, tracer2), x, tracers_later=tracers_later, **t) return # multiple ell/theta values but same bin elif np.isscalar(tracer1): n1 = len(x) n2 = len(tag_val) if tag_extra_name is None: tag_extra = np.zeros(n1) n3 = n1 else: n3 = len(tag_extra) if not (n1 == n2 == n3): raise ValueError("Length of inputs do not match in" f"added 2pt data ({n1},{n2},{n3})") if window is None: for tag_i, x_i, te_i in zip(tag_val, x, tag_extra): self._add_2pt(data_type, tracer1, tracer2, x_i, tag_i, tag_name, window, tracers_later, te_i, tag_extra_name) else: for tag_i, x_i, w_i, te_i in zip(tag_val, x, window, tag_extra): self._add_2pt(data_type, tracer1, tracer2, x_i, tag_i, tag_name, w_i, tracers_later, te_i, tag_extra_name) # multiple bin values elif np.isscalar(data_type): n1 = len(x) n2 = len(tag_val) n3 = len(tracer1) n4 = len(tracer2) if tag_extra_name is None: tag_extra = np.zeros(n1) n5 = n1 else: n5 = len(tag_extra) if not (n1 == n2 == n3 == n4 == n5): raise ValueError("Length of inputs do not match in " f"added 2pt data ({n1},{n2},{n3},{n4},{n5})") if window is None: for b1, b2, tag_i, x_i, te_i in zip(tracer1, tracer2, tag_val, x, tag_extra): self._add_2pt(data_type, b1, b2, x_i, tag_i, tag_name, window, tracers_later, te_i, tag_extra_name) else: for b1, b2, tag_i, x_i, w_i, te_i in zip(tracer1, tracer2, tag_val, x, window, tag_extra): self._add_2pt(data_type, b1, x_i, tag_i, tag_name, w_i, tracers_later, te_i, tag_extra_name) # multiple data point values else: n1 = len(x) n2 = len(tag_val) n3 = len(tracer1) n4 = len(tracer2) n5 = len(data_type) if tag_extra_name is None: tag_extra = np.zeros(n1) n6 = n1 else: n6 = len(tag_extra) if not (n1 == n2 == n3 == n4 == n5 == n6): raise ValueError("Length of inputs do not match in added " f"2pt data ({n1},{n2},{n3},{n4},{n5},{n6})") if window is None: for d, b1, b2, tag_i, x_i, te_i in zip(data_type, tracer1, tracer2, tag_val, x, tag_extra): self._add_2pt(d, b1, b2, x_i, tag_i, tag_name, window, tracers_later, te_i, tag_extra_name) else: for d, b1, b2, tag_i, x_i, w_i, te_i in zip(data_type, tracer1, tracer2, tag_val, x, window, tag_extra): self._add_2pt(d, b1, b2, x_i, tag_i, tag_name, w_i, tracers_later, te_i, tag_extra_name)
[docs] def add_ell_cl(self, data_type, tracer1, tracer2, ell, x, window=None, tracers_later=False): """ Add a series of 2pt Fourier space data points, either individually or as a group. Parameters ---------- data_type: str or array/list of str Which type C_ell to add tracer1: str or array/list of str The name(s) of the first tracer, for example a tomographic bin name tracer2: str or array/list of str The name(s) of the second tracer ell: int or array/list of int/float The ell values for these data points x: float or array/list of float The C_ell values for these data points window: Window instance Optional window object describing the window function of the data point. tracers_later: bool Optional. If False (the default), complain if n(z) tracers have not yet been defined. Otherwise, suppress this warning Returns ------- None """ if isinstance(window, BandpowerWindow): if len(ell) != window.nv: raise ValueError("Input bandpowers are misshapen") tag_extra = range(window.nv) tag_extra_name = "window_ind" window_use = [window for i in range(window.nv)] else: tag_extra = None tag_extra_name = None window_use = window self._add_2pt(data_type, tracer1, tracer2, x, ell, 'ell', window_use, tracers_later, tag_extra, tag_extra_name)
[docs] def add_theta_xi(self, data_type, tracer1, tracer2, theta, x, window=None, tracers_later=False): """ Add a series of 2pt real space data points, either individually or as a group. Parameters ---------- data_type: str or array/list of str Which xi type to extract tracer1: str or array/list of str The name(s) of the first tracer, for example a tomographic bin name tracer2: str or array/list of str The name(s) of the second tracer theta: float or array/list of int The ell values for these data points x: float or array/list of float The C_ell values for these data points window: Window instance Optional window object describing the window function of the data point. tracers_later: bool Optional. If False (the default), complain if n(z) tracers have not yet been defined. Otherwise, suppress this warning Returns ------- None """ self._add_2pt(data_type, tracer1, tracer2, x, theta, 'theta', window, tracers_later)
[docs]def concatenate_data_sets(*data_sets, labels=None, same_tracers=None): """Combine multiple sacc data sets together into one. In case of two tracers or metadata items with the same name, you can use the labels option to pass in a list of strings to append to all the names. The Covariance will be combined into either a BlockDiagonal covariance or a Diagonal covariance, depending on the inputs. Either all inputs should have a covariance attached or none of them. Parameters ---------- *data_sets: Sacc objects The data sets to combined labels: List[str] Optional list of strings to append to tracer and metadata names, in case of a clash. same_tracers: List[str] Optional list of tracers that are assumed to be the same in the different data_sets but with no correlation between the data points involving them. Only the first occurance of each tracer will be added to the combined data set. Returns ------- output: Sacc object The combined data set. """ # Early return of an empty data set object if len(data_sets) == 0: return Sacc() # check for wrong number of labels if labels is not None: if len(labels) != len(data_sets): raise ValueError("Wrong number of labels supplied when " "concatenating data sets") # Make same_tracers an empty list for easy comparison if same_tracers is None: same_tracers = [] data_0 = data_sets[0] # Either all the data sets should have covariances or none of # them should. Concatenating covariances should be # straightforward and should always result in a block-diagonal # covariance if data_0.has_covariance(): if not all(data_set.has_covariance() for data_set in data_sets): raise ValueError("Either all concatenated data sets must " "have covariances, or none of them") else: if any(data_set.has_covariance() for data_set in data_sets): raise ValueError("Either all concatenated data sets must " "have covariances, or none of them") output = Sacc() # Copy the tracers to the new for i, data_set in enumerate(data_sets): for tracer in data_set.tracers.values(): # We will be modifying the tracer, so we copy it. tracer = copy.deepcopy(tracer) # Optionally add a suffix label to avoid name clashes. if (labels is not None) and (tracer.name not in same_tracers): tracer.name = f'{tracer.name}_{labels[i]}' # Check for duplicate tracer names. # Probably this happens because the user has not provided # any labels to use as tracer suffices. But it could also # happen if they have chosen really really bad labels if tracer.name in output.tracers: if tracer.name in same_tracers: pass elif labels is None: raise ValueError("There is a name clash between " "tracers in the data sets. " "Use the labels option to give " "new names to them") else: raise ValueError("After applying your labels " "there is still a name clash " "between tracers in your concatenation." " Try different labels?") else: # Build up the combined tracer collection output.add_tracer_object(tracer) for d in data_set.data: # Shallow copy because we do not want to clone Window functions, # since they are often shared. The reason we do it at all # is because we may be modifying the tracers names below. d = copy.copy(d) # Rename the tracers if required. if labels is not None: label = labels[i] d.tracers = tuple([f'{t}_{label}' for t in d.tracers]) # Data points might reasonably have a label already, # but we would like to add a label from this concatenation # process too. If they have both, we concatenat them. # For consistency with the tracers we don't include an # underscore orig_label = d.get_tag('label', '') d.tags['label'] = (f'{orig_label}_{label}' if orig_label else label) # And build up the combined data vector output.data.append(d) # Combine the covariances if data_sets[0].has_covariance(): covs = [d.covariance for d in data_sets] cov = concatenate_covariances(*covs) output.add_covariance(cov) # Now just the metadata left. # It is an error if there is a key that is the same in both for i, data_set in enumerate(data_sets): for key, val in data_set.metadata.items(): # Use the label as a suffix here also. if labels is not None: key = key + labels[i] # Check for clashing metadata if key in output.metadata: raise ValueError("Metadata in concatenated Saccs have " "same name. Set the labels parameter " "to fix this.") output.metadata[key] = val return output