Source code for schedview.compute.maf

import sqlite3
from pathlib import Path
from tempfile import TemporaryDirectory

import pandas as pd
from rubin_scheduler.scheduler.utils import SchemaConverter
from rubin_sim import maf

__all__ = ["compute_metric_by_visit"]


def _visits_to_opsim(visits, opsim):
    # Take advantage of the schema migration code in SchemaConverter to make
    # sure we have up to date column names.
    schema_converter = SchemaConverter()
    obs = schema_converter.opsimdf2obs(visits)
    updated_opsim = schema_converter.obs2opsim(obs)

    # We can't just use the update opsim as is, because it might drop columns
    # we want. Instead, merge the results back into the visits passed to us.
    restored_columns = set(visits.columns) - set(updated_opsim.columns)
    restored_columns.add("observationId")
    norm_visits = (
        visits.reset_index()
        if "observationId" not in visits.columns and visits.index.name == "observationId"
        else visits
    )

    merged_opsim = updated_opsim.merge(
        norm_visits.loc[:, list(restored_columns)], on="observationId", suffixes=("", "_orig")
    )

    # If the round trip change actually changes values in an existing column
    # without changing the names, the merge might not work correctly. If
    # this happens, the default "inner join" performed by DataFrame.merge
    # will drop visits. Double check that this hasn't happened.
    assert len(merged_opsim) == len(visits)

    with sqlite3.connect(opsim) as con:
        merged_opsim.to_sql("observations", con)


def compute_metric(visits, metric_bundle):
    """Compute metrics with MAF.

    Parameters
    ----------
    visits : `pandas.DataFrame`
        The DataFrame of visits (with column names matching those of opsim
        database).
    metric_bundle : `maf.MetricBundle`, `dict`, or `list` of `maf.MetricBundle`
        The metric bundle(s) to run.

    Returns
    -------
    bundle_group : `maf.MetricBundleGroup`
        The metric bundle group with the results.
    """
    passed_one_bundle = isinstance(metric_bundle, maf.MetricBundle)
    metric_bundles = [metric_bundle] if passed_one_bundle else metric_bundle

    with TemporaryDirectory() as working_dir:
        visits_db = Path(working_dir).joinpath("visits.db").as_posix()
        _visits_to_opsim(visits, visits_db)

        bundle_group = maf.MetricBundleGroup(metric_bundles, visits_db, out_dir=working_dir)
        bundle_group.run_all()

    return metric_bundle


[docs] def compute_metric_by_visit(visits, metric, constraint=""): """Compute a MAF metric by visit. Parameters ---------- visits : `pandas.DataFrame` The DataFrame of visits (with column names matching those of opsim database). metric : `rubin_sim.maf.metrics.BaseMetric` The metric to compute. constraint : `str` The SQL query to filter visits to be used. Returns ------- values : `pandas.Series` The metric values. """ slicer = maf.OneDSlicer("observationId", bin_size=1) metric_bundle = maf.MetricBundle(slicer=slicer, metric=metric, constraint=constraint) compute_metric(visits, metric_bundle) result = pd.Series(metric_bundle.metric_values, index=slicer.slice_points["bins"][:-1].astype(int)) result.index.name = "observationId" return result
[docs] def compute_hpix_metric_in_bands(visits, metric, constraint="", nside=32): """Compute a MAF metric by visit. Parameters ---------- visits : `pandas.DataFrame` The DataFrame of visits (with column names matching those of opsim database). metric : `rubin_sim.maf.metrics.BaseMetric` The metric to compute. constraint : `str` The SQL query to filter visits to be used. nside : `int` The healpix nside of the healpix arrays to return. Returns ------- metric_values : `dict` A dictionary of healpix arrays, where the keys are the filters with visits in the input visit DataFrame. """ # Do only the filters we actually used used_filters = visits["filter"].unique() bundles = {} for this_filter in used_filters: this_constraint = f"filter == '{this_filter}'" if len(constraint) > 0: this_constraint += f" AND {constraint}" slicer = maf.HealpixSlicer(nside=nside, verbose=False) bundles[this_filter] = maf.MetricBundle(metric, slicer, this_constraint) compute_metric(visits, bundles) metric_values = {b: bundles[b].metric_values for b in bundles if bundles[b].metric_values is not None} return metric_values