Source code for schedview.collect.opsim

import sqlite3
from warnings import warn

import pandas as pd
import rubin_scheduler
import yaml
from astropy.time import Time
from lsst.resources import ResourcePath
from rubin_scheduler.scheduler.utils import SchemaConverter
from rubin_scheduler.utils import ddf_locations

try:
    from rubin_sim import maf
except ModuleNotFoundError:
    pass


def all_visits_columns():
    """Return all visits columns understood by the current rubin_scheduler."""
    schema_converter = SchemaConverter()
    current_cols = set(schema_converter.convert_dict.keys())
    backwards_cols = set(schema_converter.backwards.keys())
    return current_cols.union(backwards_cols)


def _normalize_opsim_columns(opsim_rp: ResourcePath, dbcols: list[str]):
    # At least one opsim column has been renamed since the start of
    # simulations. The mapping between new and old names can be found in
    # rubin_scheduler.scheduler.utils.SchemaConverter.backwards.

    # We want our queries to work the same whether run on simulation from
    # before or after the name change. So, if a requested column is missing,
    # see if it is the renamed value of one that used to exist, and if so,
    # use the old column name instead.

    # In addition to returning the list of columns with the replaced column
    # name, return the mapping so that a table's column headings can be
    # updated from the old name used to the new (requested) name.

    with opsim_rp.as_local() as local_obs_path:
        with sqlite3.connect(local_obs_path.ospath) as sim_connection:
            query = "SELECT name FROM PRAGMA_TABLE_INFO('observations');"
            present_columns = set(pd.read_sql(query, sim_connection).name.values)

    new_columns = []
    used_column_map = {}
    backwards_column_map = {v: k for k, v in SchemaConverter().backwards.items()}
    for column in dbcols:
        if column in present_columns:
            new_columns.append(column)
        elif column in backwards_column_map:
            old_column = backwards_column_map[column]
            if old_column in present_columns:
                warn(f"Column {column} not found in {opsim_rp}, using deprecated {old_column} instead")
                used_column_map[old_column] = column
                new_columns.append(old_column)
            else:
                warn(f"Neither column {column} nor deprecated {old_column} found in {opsim_rp}, skipping.")
        else:
            warn(f"Column {column} not found in {opsim_rp}, skipping.")

    return new_columns, used_column_map


[docs] def read_opsim( opsim_uri, start_time=None, end_time=None, constraint=None, dbcols=None, **kwargs, ): """Read visits from an opsim database. Parameters ---------- opsim_uri : `str` The uri from which to load visits start_time : `str`, `astropy.time.Time` The start time for visits to be loaded end_time : `str`, `astropy.time.Time` The end time for visits ot be loaded constraint : `str`, None Query for which visits to load. dbcols : `None` or `list` [`str`] Columns required from the database. Defaults to None, which queries all columns known to rubin_scheduler. **kwargs Passed to `maf.get_sim_data`, if `rubin_sim` is available. Returns ------- visits : `pandas.DataFrame` The visits and their parameters. """ # Add constraints corresponding to quested start and end times if (start_time is not None) or (end_time is not None): if constraint is None: constraint = "" if start_time is not None: if len(constraint) > 0: constraint += " AND " constraint += f"(observationStartMJD >= {Time(start_time).mjd})" if end_time is not None: if len(constraint) > 0: constraint += " AND " constraint += f"(observationStartMJD <= {Time(end_time).mjd})" original_resource_path = ResourcePath(opsim_uri) if original_resource_path.isdir(): # If we were given a directory, look for a metadata file in the # directory, and look up in it what file to load observations from. metadata_path = original_resource_path.join("sim_metadata.yaml") sim_metadata = yaml.safe_load(metadata_path.read().decode("utf-8")) obs_basename = sim_metadata["files"]["observations"]["name"] obs_path = original_resource_path.join(obs_basename) else: # otherwise, assume we were given the path to the observations file. obs_path = original_resource_path with obs_path.as_local() as local_obs_path: with sqlite3.connect(local_obs_path.ospath) as sim_connection: if dbcols is None: col_query = "SELECT name FROM PRAGMA_TABLE_INFO('observations')" raw_dbcols = [ c for c in pd.read_sql(col_query, sim_connection).name if c in all_visits_columns() ] # Update any outdated column names backwards = SchemaConverter().backwards dbcols = [(backwards[c] if c in backwards else c) for c in raw_dbcols] norm_columns, used_column_map = _normalize_opsim_columns(obs_path, dbcols) try: try: visits = pd.DataFrame( maf.get_sim_data(sim_connection, constraint, norm_columns, **kwargs) ) except UserWarning: warn("No visits match constraints.") visits = ( SchemaConverter() .obs2opsim(rubin_scheduler.scheduler.utils.ObservationArray()) .iloc[0:-1] ) if "observationId" not in visits.columns and "ID" in visits.columns: visits.rename(columns={"ID": "observationId"}, inplace=True) except NameError as e: if e.name == "maf" and e.args == ("name 'maf' is not defined",): if len(kwargs) > 0: raise NotImplementedError( f"Argument {list(kwargs)[0]} not supported without rubin_sim installed" ) query = f'SELECT {", ".join(norm_columns)} FROM observations' if constraint: query += f" WHERE {constraint}" visits = pd.read_sql(query, sim_connection) else: raise e # If we replaced modern columns with legacy ones in the query, # update the column names. visits.rename(columns=used_column_map, inplace=True) if "start_date" not in visits: if "observationStartDatetime64" in visits: visits["start_date"] = pd.to_datetime( visits.observationStartDatetime64, unit="ns", utc=True ) elif "observationStartMJD" in visits: visits["start_date"] = pd.to_datetime( visits.observationStartMJD + 2400000.5, origin="julian", unit="D", utc=True ) visits.set_index("observationId", inplace=True) return visits
[docs] def read_ddf_visits( opsim_uri, start_time=None, end_time=None, dbcols=None, **kwargs, ): """Read DDF visits from an opsim database. Parameters ---------- opsim_uri : `str` The uri from which to load visits start_time : `str`, `astropy.time.Time` The start time for visits to be loaded end_time : `str`, `astropy.time.Time` The end time for visits ot be loaded dbcols : `Note` oc `list` [`str`] Columns required from the database. Defaults to None, which uses all columns in the database. stackers : `list` [`rubin_sim.maf.stackers`], optional Stackers to be used to generate additional columns. Returns ------- visits : `pandas.DataFrame` The visits and their parameters. """ try: ddf_field_names = [f"DD:{field_name}" for field_name in ddf_locations().keys()] # Note that this where clause is hard-coded for target_name (v4+) # but other columns in query will be backwards-compatible. constraint = f"target_name IN {tuple(field_name for field_name in ddf_field_names)}" visits = read_opsim( opsim_uri, start_time=start_time, end_time=end_time, constraint=constraint, dbcols=dbcols, **kwargs, ) except pd.errors.DatabaseError: # Older database use 'target' not target_name and does not include DD ddf_field_names = [f"{field_name}" for field_name in ddf_locations().keys()] # Note that this where clause is hard-coded for target_name (v4+) # but other columns in query will be backwards-compatible. constraint = f"target IN {tuple(field_name for field_name in ddf_field_names)}" visits = read_opsim( opsim_uri, start_time=start_time, end_time=end_time, constraint=constraint, dbcols=dbcols, **kwargs, ) # There are even older databases which do not use target at all, # but we'll leave those for now. return visits