Source code for schedview.compute.scheduler

import copy
import datetime
import inspect
import lzma
import numbers
import pickle

import numpy as np
import pandas as pd
from astropy.time import Time
from astropy.timeseries import TimeSeries
from rubin_scheduler.scheduler import sim_runner, surveys
from rubin_scheduler.scheduler.example import example_scheduler
from rubin_scheduler.scheduler.model_observatory import ModelObservatory
from rubin_scheduler.scheduler.utils import SchemaConverter, empty_observation


def _make_observation_from_record(record):
    """Convert an opsim visit record to a scheduler observation

    Parameters
    ----------
    record : `dict`
        A row from an opsim output table

    Returns
    -------
    observation : `numpy.ndarray`
        A numpy recarray with data understood by the scheduler
    """
    observation = empty_observation()
    observation["RA"] = np.radians(record["fieldRA"])
    observation["dec"] = np.radians(record["fieldDec"])
    observation["mjd"] = record["observationStartMJD"]
    observation["flush_by_mjd"] = record["flush_by_mjd"]
    observation["exptime"] = record["visitExposureTime"]
    observation["filter"] = record["filter"]
    observation["rotSkyPos"] = np.radians(record["rotSkyPos"])
    observation["rotSkyPos_desired"] = np.radians(record["rotSkyPos_desired"])
    observation["nexp"] = record["numExposures"]
    observation["airmass"] = record["airmass"]
    observation["FWHM_500"] = record["seeingFwhm500"]
    observation["FWHMeff"] = record["seeingFwhmEff"]
    observation["FWHM_geometric"] = record["seeingFwhmGeom"]
    observation["skybrightness"] = record["skyBrightness"]
    observation["night"] = record["night"]
    observation["slewtime"] = record["slewTime"]
    observation["visittime"] = record["visitTime"]
    observation["slewdist"] = np.radians(record["slewDistance"])
    observation["fivesigmadepth"] = record["fiveSigmaDepth"]
    observation["alt"] = np.radians(record["altitude"])
    observation["az"] = np.radians(record["azimuth"])
    observation["pa"] = np.radians(record["paraAngle"])
    observation["clouds"] = record["cloud"]
    observation["moonAlt"] = np.radians(record["moonAlt"])
    observation["sunAlt"] = np.radians(record["sunAlt"])
    observation["note"] = record["note"]
    observation["field_id"] = record["fieldId"]
    observation["survey_id"] = record["proposalId"]
    observation["block_id"] = record["block_id"]
    observation["lmst"] = record["observationStartLST"] / 15
    observation["rotTelPos"] = np.radians(record["rotTelPos"])
    observation["rotTelPos_backup"] = np.radians(record["rotTelPos_backup"])
    observation["moonAz"] = np.radians(record["moonAz"])
    observation["sunAz"] = np.radians(record["sunAz"])
    observation["sunRA"] = np.radians(record["sunRA"])
    observation["sunDec"] = np.radians(record["sunDec"])
    observation["moonRA"] = np.radians(record["moonRA"])
    observation["moonDec"] = np.radians(record["moonDec"])
    observation["moonDist"] = np.radians(record["moonDistance"])
    observation["solarElong"] = np.radians(record["solarElong"])
    observation["moonPhase"] = record["moonPhase"]
    observation["cummTelAz"] = np.radians(record["cummTelAz"])
    observation["scripted_id"] = record["scripted_id"]
    return observation


[docs] def replay_visits(scheduler, visits): """Update a scheduler instances with a set of visits. Parameters ---------- scheduler : `rubin_scheduler.scheduler.CoreScheduler` An instance of the scheduler to update visits : `pandas.DataFrame` A table of visits to add. """ for visit_id, visit_record in visits.iterrows(): obs = _make_observation_from_record(visit_record) scheduler.add_observation(obs) scheduler.conditions.mjd = float(obs["mjd"] + (obs["slewtime"] + obs["visittime"]) / (24 * 60 * 60))
def compute_basis_function_reward_at_time(scheduler, time, observatory=None): if observatory is None: observatory = ModelObservatory(nside=scheduler.nside) ap_time = Time(time) observatory.mjd = ap_time.mjd conditions = observatory.return_conditions() reward_df = scheduler.make_reward_df(conditions) summary_df = reward_df.reset_index() def make_tier_name(row): tier_name = f"tier {row.list_index}" return tier_name summary_df["tier"] = summary_df.apply(make_tier_name, axis=1) def get_survey_name(row): try: survey_name = scheduler.survey_lists[row.list_index][row.survey_index].survey_name except AttributeError: survey_name = "" if len(survey_name) == 0: class_name = scheduler.survey_lists[row.list_index][row.survey_index].__class__.__name__ survey_name = f"{class_name}_{row.list_index}_{row.survey_index}" return survey_name summary_df["survey_name"] = summary_df.apply(get_survey_name, axis=1) def make_survey_row(survey_bfs): infeasible_bf = ", ".join(survey_bfs.loc[~survey_bfs.feasible.astype(bool)].basis_function.to_list()) infeasible = ~np.all(survey_bfs.feasible.astype(bool)) reward = survey_bfs.max_accum_reward.iloc[-1] survey_row = pd.Series( { "reward": reward, "infeasible": infeasible, "infeasible_bfs": infeasible_bf, } ) return survey_row survey_df = summary_df.groupby(["tier", "survey_name"]).apply(make_survey_row) return survey_df def compute_basis_function_rewards(scheduler, sample_times=None, observatory=None): if observatory is None: observatory = ModelObservatory(nside=scheduler.nside) if sample_times is None: # Compute values for the current night by default. sample_times = pd.date_range( Time(float(scheduler.conditions.sun_n12_setting), format="mjd", scale="utc").datetime, Time(float(scheduler.conditions.sun_n12_rising), format="mjd", scale="utc").datetime, freq="10T", ) if isinstance(sample_times, TimeSeries): sample_times = sample_times.to_pandas() reward_list = [] for time in sample_times: this_time_reward = compute_basis_function_reward_at_time(scheduler, time, observatory) this_time_reward["mjd"] = Time(time).mjd this_time_reward["time"] = time reward_list.append(this_time_reward) rewards = pd.concat(reward_list).reset_index() return rewards def _normalize_time(this_time): if isinstance(this_time, Time): this_time = this_time if this_time is None: this_time = Time.now(scale="utc") elif isinstance(this_time, numbers.Number) and 51544 < this_time < 88069: this_time = Time(this_time, format="mjd", scale="utc") elif isinstance(this_time, str) or isinstance(this_time, datetime.datetime): this_time = Time(this_time, scale="utc") elif isinstance(this_time, pd.Timestamp): this_time = Time(this_time.to_pydatetime(), scale="utc") return this_time
[docs] def create_example( current_time=None, survey_start="2025-01-01T16:00:00Z", nside=None, simulate=True, scheduler_pickle_fname=None, opsim_db_fname=None, rewards_fname=None, ): """Create an example scheduler and observatory. Parameters ---------- current_time : `float`, `str`, `datetime.datetime`, `pandas.Timestamp`, or `astropy.time.Time` The time to initialize the observatory and conditions to. Floats are interpreted as MJD. Strings are interpreted as UTC. If None, use the current time. Defaults no None. survey_start : `float`, `str`, `datetime.datetime`, `pandas.Timestamp`, or `astropy.time.Time` The survey start time. nside : `int` The nside to use for the scheduler and observatory. If None, use the default nside for the example scheduler. simulate : `bool` Run a sample simulation from survey_start to current_time scheduler_pickle_fname : `str` The filename to save the scheduler to. opsim_db_fname : `str` The filename to save the opsim database to. rewards_fname : `str` The filename to save the rewards to. Returns ------- scheduler : `rubin_scheduler.scheduler.schedulers.CoreScheduler` The scheduler instance. observatory : `rubin_scheduler.models.ModelObservatory` The observatory instance. conditions : `rubin_scheduler.scheduler.features.Conditions` The conditions at the current time. observations : `pd.DataFrame` The observations from the simulation. """ record_rewards = rewards_fname is not None current_time = _normalize_time(current_time) survey_start = _normalize_time(survey_start) if nside is None: scheduler = example_scheduler(mjd_start=survey_start.mjd) nside = scheduler.nside else: scheduler = example_scheduler(nside=nside, mjd_start=survey_start.mjd) observatory = ModelObservatory(nside=nside, mjd_start=survey_start.mjd) if simulate: sim_duration = current_time.mjd - survey_start.mjd if record_rewards: scheduler.keep_rewards = True observatory, scheduler, observations, reward_df, obs_rewards = sim_runner( observatory, scheduler, mjd_start=survey_start.mjd, survey_length=sim_duration, record_rewards=True, ) reward_df.to_hdf(rewards_fname, "reward_df") obs_rewards.to_hdf(rewards_fname, "obs_rewards") else: observatory, scheduler, observations = sim_runner( observatory, scheduler, mjd_start=survey_start.mjd, survey_length=sim_duration, ) if opsim_db_fname is not None: converter = SchemaConverter() converter.obs2opsim(observations, filename=opsim_db_fname, delete_past=True) else: observations = None observatory.mjd = current_time.mjd conditions = observatory.return_conditions() scheduler.update_conditions(conditions) if scheduler_pickle_fname is not None: if scheduler_pickle_fname.endswith(".xz"): with lzma.open(scheduler_pickle_fname, "wb", format=lzma.FORMAT_XZ) as out_file: pickle.dump((scheduler, scheduler.conditions), out_file) else: with open(scheduler_pickle_fname, "wb") as out_file: pickle.dump((scheduler, scheduler.conditions), out_file) result = (scheduler, observatory, conditions, observations) if record_rewards: result += (reward_df, obs_rewards) return result
[docs] def make_unique_survey_name(scheduler, survey_index=None): """Make a unique survey name for a given survey index. Parameters ---------- scheduler : `rubin_scheduler.scheduler.schedulers.CoreScheduler` The scheduler instance. survey_index : `list` of `int` The index of the survey to name. If None, use the current survey. Returns ------- survey_name : `str` A unique name for the survey. """ if survey_index is None: survey_index = copy.deepcopy(scheduler.survey_index) # slice down through as many indexes as we have survey = scheduler.survey_lists for level_index in survey_index: survey = survey[level_index] try: survey_name = survey.survey_name if len(survey_name) < 1: survey_name = str(survey) except AttributeError: survey_name = str(survey) # For auxtel, different fields have the same survey_name, but # the interface should show the field name. So, if we're # getting a field name in note, use that instead of the survey_name # attribute. try: observation_note = f"{survey.observations['note'][0]}" except (AttributeError, ValueError, TypeError): observation_note = None if (observation_note is not None) and (survey.survey_name != observation_note): survey_name = f"{survey.observations['note'][0]}" survey_name = f"{survey_index[1]}: {survey_name}" # Bokeh tables have problems with < and > survey_name = survey_name.replace("<", "").replace(">", "") return survey_name
[docs] def make_scheduler_summary_df(scheduler, conditions, reward_df=None): """Summarize the reward from each scheduler Parameters ---------- scheduler : `rubin_scheduler.scheduler.schedulers.CoreScheduler` The scheduler instance. conditions : `rubin_scheduler.scheduler.features.conditions.Conditions` The conditions for which to summarize the reward. reward_df : `pandas.DataFrame` The table with rewards for each survey. If None, calculate it. Returns ------- survey_df : `pandas.DataFrame` A table showing the reword for each feasible survey, and the basis functions that result in it being infeasible for the rest. """ if conditions is None: conditions = scheduler.conditions if reward_df is None: reward_df = scheduler.make_reward_df(conditions) summary_df = reward_df.reset_index() # Some oddball surveys do not have basis functions, but they still # need rows, so add fake basis functions to the summary_df for them. summary_df.set_index(["list_index", "survey_index"], inplace=True) for list_index, survey_list in enumerate(scheduler.survey_lists): for survey_index, survey in enumerate(survey_list): if (list_index, survey_index) not in summary_df.index: survey.calc_reward_function(conditions) summary_df.loc[(list_index, survey_index), "max_basis_reward"] = survey.reward summary_df.loc[(list_index, survey_index), "max_accum_reward"] = survey.reward summary_df.loc[(list_index, survey_index), "feasible"] = ( np.isfinite(survey.reward) or survey.reward > 0 ) summary_df.loc[(list_index, survey_index), "basis_function"] = "N/A" summary_df.reset_index(inplace=True) def make_tier_name(row): tier_name = f"tier {row.list_index}" return tier_name summary_df["tier"] = summary_df.apply(make_tier_name, axis=1) def get_survey_name(row): survey_name = make_unique_survey_name(scheduler, [row.list_index, row.survey_index]) return survey_name summary_df["survey_name_with_id"] = summary_df.apply(get_survey_name, axis=1) standard_surveys = [c[0] for c in inspect.getmembers(surveys)] def get_survey_url(row): url_base = "https://rubin-scheduler.lsst.io/fbs-api-surveys.html" if isinstance(row.survey_class, str) and row.survey_class in standard_surveys: section_base = "rubin_scheduler.scheduler.surveys" survey_url = f"{url_base}#{section_base}.{row.survey_class}" else: generic_survey = "rubin_scheduler.scheduler.surveys.BaseSurvey" survey_url = f"{url_base}#{generic_survey}" return survey_url summary_df["survey_url"] = summary_df.apply(get_survey_url, axis=1) def make_survey_row(survey_bfs): infeasible_bf = ", ".join(survey_bfs.loc[~survey_bfs.feasible.astype(bool)].basis_function.to_list()) infeasible = ~np.all(survey_bfs.feasible.astype(bool)) reward = infeasible_bf if infeasible else survey_bfs.max_accum_reward.iloc[-1] if reward in (None, "N/A", "None"): reward = "Infeasible" if infeasible else "Feasible" survey_row = pd.Series({"reward": reward, "infeasible": infeasible}) return survey_row survey_df = summary_df.groupby( ["list_index", "survey_index", "survey_name_with_id", "survey_url", "tier"] ).apply(make_survey_row) return survey_df["reward"].reset_index()