Source code for schedview.compute.maf

import sqlite3
from pathlib import Path
from tempfile import TemporaryDirectory

import pandas as pd
from rubin_scheduler.scheduler.utils import SchemaConverter
from rubin_sim import maf

__all__ = ["compute_metric_by_visit"]


def _visits_to_opsim(visits, opsim):
    # Only write columns in visits that are in opsim databases,
    # thereby avoiding added columns that might not be types
    # that can be written to sqlite databases.
    opsim_columns = list(SchemaConverter().convert_dict.keys())
    with sqlite3.connect(opsim) as connection:
        visits.reset_index()[opsim_columns].to_sql("observations", connection, index=False)


def compute_metric(visits, metric_bundle):
    """Compute metrics with MAF.

    Parameters
    ----------
    visits : `pandas.DataFrame`
        The DataFrame of visits (with column names matching those of opsim
        database).
    metric_bundle : `maf.MetricBundle`, `dict`, or `list` of `maf.MetricBundle`
        The metric bundle(s) to run.

    Returns
    -------
    bundle_group : `maf.MetricBundleGroup`
        The metric bundle group with the results.
    """
    passed_one_bundle = isinstance(metric_bundle, maf.MetricBundle)
    metric_bundles = [metric_bundle] if passed_one_bundle else metric_bundle

    with TemporaryDirectory() as working_dir:
        visits_db = Path(working_dir).joinpath("visits.db").as_posix()
        _visits_to_opsim(visits, visits_db)

        bundle_group = maf.MetricBundleGroup(metric_bundles, visits_db, out_dir=working_dir)
        bundle_group.run_all()

    return metric_bundle


[docs] def compute_metric_by_visit(visits, metric, constraint=""): """Compute a MAF metric by visit. Parameters ---------- visits : `pandas.DataFrame` The DataFrame of visits (with column names matching those of opsim database). metric : `rubin_sim.maf.metrics.BaseMetric` The metric to compute. constraint : `str` The SQL query to filter visits to be used. Returns ------- values : `pandas.Series` The metric values. """ slicer = maf.OneDSlicer("observationId", bin_size=1) metric_bundle = maf.MetricBundle(slicer=slicer, metric=metric, constraint=constraint) compute_metric(visits, metric_bundle) result = pd.Series(metric_bundle.metric_values, index=slicer.slice_points["bins"][:-1].astype(int)) result.index.name = "observationId" return result
[docs] def compute_hpix_metric_in_bands(visits, metric, constraint="", nside=32): """Compute a MAF metric by visit. Parameters ---------- visits : `pandas.DataFrame` The DataFrame of visits (with column names matching those of opsim database). metric : `rubin_sim.maf.metrics.BaseMetric` The metric to compute. constraint : `str` The SQL query to filter visits to be used. nside : `int` The healpix nside of the healpix arrays to return. Returns ------- metric_values : `dict` A dictionary of healpix arrays, where the keys are the filters with visits in the input visit DataFrame. """ # Do only the filters we actually used used_filters = visits["filter"].unique() bundles = {} for this_filter in used_filters: this_constraint = f"filter == '{this_filter}'" if len(constraint) > 0: this_constraint += f" AND {constraint}" slicer = maf.HealpixSlicer(nside=nside, verbose=False) bundles[this_filter] = maf.MetricBundle(metric, slicer, this_constraint) compute_metric(visits, bundles) metric_values = {b: bundles[b].metric_values for b in bundles if bundles[b].metric_values is not None} return metric_values