Source code for schedview.plot.scheduler

import collections.abc
import itertools
import logging
import warnings
from collections import OrderedDict

import bokeh.core.properties
import bokeh.models
import bokeh.plotting
import healpy as hp
import numpy as np
import pandas as pd
import rubin_scheduler.scheduler.basis_functions
import rubin_scheduler.scheduler.example
import rubin_scheduler.scheduler.schedulers
import rubin_scheduler.scheduler.surveys
from astropy.time import Time
from rubin_scheduler.scheduler.features.conditions import Conditions
from rubin_scheduler.scheduler.model_observatory import ModelObservatory
from rubin_scheduler.scheduler.schedulers.core_scheduler import CoreScheduler as CoreScheduler
from rubin_scheduler.utils import survey_start_mjd
from uranography.api import ArmillarySphere, HorizonMap, MollweideMap, Planisphere, make_zscale_linear_cmap

from schedview.collect import read_scheduler
from schedview.compute.scheduler import make_scheduler_summary_df, make_unique_survey_name
from schedview.compute.survey import make_survey_reward_df

DEFAULT_MJD = survey_start_mjd() + 0.2
DEFAULT_NSIDE = 32


def make_logger():
    logger = logging.getLogger("sched_logger")
    logger.setLevel(logging.DEBUG)

    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter("%(asctime)s: %(message)s")
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    return logger


LOGGER = make_logger()


[docs] class BadSchedulerError(Exception): pass
[docs] class BadConditionsError(Exception): pass
class SchedulerDisplay: tooltips = [ ("RA", "@center_ra"), ("Decl", "@center_decl"), ] key_markup = """<h1>Key</h1> <ul> <li><b>Black line</b> Horizon</li> <li><b>Red line</b> ZD=70 deg.</li> <li><b>Green line</b> Ecliptic</li> <li><b>Blue line</b> Galactic plane</li> <li><b>Yellow dot</b> Sun position</li> <li><b>Gray dot</b> Moon position</li> <li><b>Red dot</b> Survey field(s)</li> <li><b>Green dot</b> Telescope pointing</li> </ul> """ def __init__(self, init_key="AvoidDirectWind", nside=DEFAULT_NSIDE, scheduler=None): self._scheduler = None self.survey_index = [None, None] self.healpix_maps = OrderedDict() self.init_key = init_key self.map_key = init_key self.nside = nside self.healpix_cmap = None self.data_sources = {} self.glyphs = {} self.bokeh_models = {} self.sphere_maps = {} self._figure = None mjd = Time.now().mjd if DEFAULT_MJD is None else DEFAULT_MJD try: self.observatory = ModelObservatory(mjd_start=mjd - 1, nside=nside) except ValueError: self.observatory = None if scheduler is None: scheduler = rubin_scheduler.scheduler.example.example_scheduler( nside=nside, mjd_start=DEFAULT_MJD ) if self.observatory is not None: conditions = self.observatory.return_conditions() else: conditions = Conditions(mjd_start=mjd - 1, nside=nside) conditions.mjd = mjd scheduler.update_conditions(conditions) scheduler.request_observation() self.scheduler = scheduler @property def map_keys(self): """Return keys for the available healpix maps""" keys = list(self.healpix_maps.keys()) return keys @property def mjd(self): # Sometimes conditions.mjd is a one-d numpy array, sometimes a float. # make sure we always return a float. return float(self.conditions.mjd) @mjd.setter def mjd(self, value): """Update the interface for a new date Parameters ---------- value : `float` The new MJD """ # Sometimes a loaded pickle will have close to a represented # time, but not identical, and we do not want to try to recreate # the conditions object if we have loaded it and not changed the # time. So, check only that the mjd is close, not that it # is identical. if np.abs(value - self.mjd) < (1.0 / (24 * 60 * 60)): # Nothing needs to be done return if self.observatory.nside != self.scheduler.nside: self.observatory = ModelObservatory( nside=self.scheduler.nside, mjd_start=self.observatory.mjd_start, alt_min=np.degrees(self.observatory.alt_min), lax_dome=self.observatory.lax_dome, cloud_limit=self.observatory.cloud_limit, sim_to_o=self.observatory.sim__to_o, park_after=self.observatory.park_after * 60 * 24, ) LOGGER.info(f"Creating conditions for mjd {value}") try: # If we cannot go to the requested MJD, follback on # on we can go to: if value < self.observatory.sky_model.mjd_left.min(): LOGGER.info("Cannot go to requested date, going to earliest.") self.observatory.mjd = self.observatory.sky_model.mjd_left.min() + 1.0 elif value > self.observatory.sky_model.mjd_right.max(): LOGGER.info("Cannot go to requested date, going to latest.") self.observatory.mjd = self.observatory.sky_model.mjd_right.max() - 1.0 else: self.observatory.mjd = value conditions = self.observatory.return_conditions() # Make sure we have set a time at night, and if we have night # go to the sunsrise or sunset on the same night. if conditions.sun_n18_setting > self.observatory.mjd: self.observatory.mjd = conditions.sun_n18_setting conditions = self.observatory.return_conditions() if conditions.sun_n18_rising < self.observatory.mjd: self.observatory.mjd = conditions.sun_n18_rising conditions = self.observatory.return_conditions() LOGGER.info("Conditions created") except (ValueError, AttributeError): # If we do not have the right cache of sky brightness # values on disk, we may not be able to instantiate # ModelObservatory, but we should be able to run # it anyway. Fake up a conditions object as well as # we can. conditions = Conditions(mjd_start=value - 1, nside=self.nside) conditions.mjd = value LOGGER.warning("Created dummy conditions.") self.conditions = conditions @property def time(self): """Return the time as an astropy Time objec. Return ------ time : `astropy.time.Time` The time """ time = Time(self.mjd, format="mjd", scale="utc") time.format = "iso" return time @time.setter def time(self, time): """Set the time according to a time string. Parameters ---------- time : `astropy.time.Time` or `str` The new time Parameterers are the same as for `pandas.to_datetime`. """ if isinstance(time, Time): new_mjd = time.mjd elif isinstance(time, pd.Timestamp): new_mjd = time.to_julian_date() - 2400000.5 else: try: new_mjd = Time(time).mjd except ValueError: new_mjd = pd.to_datetime(time, utc=True).to_julian_date() - 2400000.5 self.mjd = new_mjd def _update_healpix_maps(self): """Update healpix values from the scheduler.""" # Be sure we keep using the same dictionary, and just update it, # rather than use a new one because any new one we make won't propogate # into other callbacks. self.healpix_maps.clear() full_healpix_maps = self.scheduler.get_healpix_maps( survey_index=self.survey_index, conditions=self.conditions ) for key in full_healpix_maps: new_key = key.replace(" ", "_").replace(".", "_").replace("@", "_") values = full_healpix_maps[key] if values.shape[0] != hp.nside2npix(self.nside): values[np.isnan(values)] = hp.UNSEEN values = hp.ud_grade( values, self.nside, ) values[values == hp.UNSEEN] = np.nan self.healpix_maps[new_key] = values survey = self.scheduler.survey_lists[self.survey_index[0]][self.survey_index[1]] reward = survey.calc_reward_function(self.conditions) if not (isinstance(reward, np.ndarray) and len(reward) > 1): try: basis_weights = survey.basis_weights basis_functions = survey.basis_functions supported_survey = True except AttributeError: supported_survey = False if supported_survey: npix = hp.nside2npix(self.nside) reward = np.zeros(npix) indx = np.arange(npix) for bf, weight in zip(basis_functions, basis_weights): basis_value = bf(self.conditions, indx=indx) if isinstance(basis_value, np.ndarray): basis_value[np.isnan(basis_value)] = hp.UNSEEN basis_value = hp.ud_grade(basis_value, self.nside) basis_value[basis_value == hp.UNSEEN] = np.nan reward += basis_value * weight if isinstance(reward, np.ndarray) and len(reward) > 1: if reward.shape[0] != hp.nside2npix(self.nside): reward[np.isnan(reward)] = hp.UNSEEN reward = hp.ud_grade( reward, self.nside, ) reward[reward == hp.UNSEEN] = np.nan if np.any(np.isfinite(reward)): self.healpix_maps["reward"] = reward @property def healpix_values(self): """Healpix numpy array for the current map.""" if len(self.healpix_maps) == 0: npix = hp.nside2npix(self.nside) values = np.ones(npix) return values return self.healpix_maps[self.map_key] def load(self, file_name): """Load scheduler data Parameters ---------- file_name : `str` The file name from which to load scheduler state. """ scheduler, conditions = read_scheduler(file_name) if not isinstance(scheduler, CoreScheduler): raise BadSchedulerError() if not isinstance(conditions, Conditions): raise BadConditionsError() scheduler.update_conditions(conditions) self.scheduler = scheduler @property def scheduler(self): return self._scheduler @scheduler.setter def scheduler(self, scheduler): """Set the scheduler visualized. Parameters ---------- scheduler : `rubin_scheduler.scheduler.schedulers.CoreScheduler` The new scheduler to visualize """ # Work separated into _set_scheduler so that it can be overriden by # subclasses. self._set_scheduler(scheduler) def _set_scheduler(self, scheduler): LOGGER.debug("Setting the scheduler") self._scheduler = scheduler self.survey_index[0] = self.scheduler.survey_index[0] self.survey_index[1] = self.scheduler.survey_index[1] if self.survey_index[0] is None: self.survey_index = [0, 0] if self.survey_index[1] is None: self.survey_index[1] = 0 self.conditions = scheduler.conditions @property def conditions(self): return self.scheduler.conditions @conditions.setter def conditions(self, conditions): """Update the figure to represent changed conditions. Parameters ---------- conditions : `rubin_scheduler.scheduler.features.conditions.Conditions` The new conditions. """ if conditions.nside != self.nside: warnings.warn("Setting conditions to an unequal nside.") self._set_conditions(conditions) def _set_conditions(self, conditions): # Separated from the decorated setter so that it can be overridden # in a subclass. LOGGER.info("Updating interface for new conditions") self.scheduler.update_conditions(conditions) self.scheduler.request_observation() self._update_healpix_maps() # If the current map is no longer valid, pick a valid one. # Otherwise, keep displaying the same map. if self.map_key not in self.map_keys: self.map_key = self.map_keys[-1] for sphere_map in self.sphere_maps.values(): sphere_map.mjd = self.mjd if "armillary_sphere" in self.sphere_maps: self.sphere_maps["armillary_sphere"].sliders["mjd"].value = self.sphere_maps[ "armillary_sphere" ].mjd LOGGER.info("Finished updating conditions") def _unique_survey_name(self, survey_index=None): survey_name = make_unique_survey_name(self.scheduler, survey_index) return survey_name @property def tier_names(self): """List of names of tiers in current survey.""" tiers = [f"tier {t}" for t in np.arange(len(self.scheduler.survey_lists))] return tiers def select_tier(self, tier): """Set the tier being displayed.""" LOGGER.info(f"swiching tier to {tier}") self.survey_index[0] = self.tier_names.index(tier) self.survey_index[1] = 0 @property def surveys_in_tier(self): """List of surveys in the current tier.""" tier = self.survey_index[0] surveys_in_tier = [ self._unique_survey_name([tier, i]) for i in range(len(self.scheduler.survey_lists[tier])) ] return surveys_in_tier def select_survey(self, survey): """Update the display to show a given survey. Parameters ---------- survey : `str` The name of the survey to select. """ # keep using the same survey_index list, and just update it, # not create a new one, because any new one we make won't propogate # into other callbacks. self.survey_index[1] = self.surveys_in_tier.index(survey) self._update_healpix_maps() def select_value(self, map_key): """Select the value to be displayed on the maps. Parameters ---------- map_key : `str` The name of the value to be mapped """ LOGGER.info(f"Switching value to {map_key}") self.map_key = map_key def make_sphere_map( self, key, cls, title, frame_width=512, frame_height=512, decorate=True, horizon_graticules=False, ): if "hover_tool" not in self.bokeh_models: self.bokeh_models["hover_tool"] = bokeh.models.HoverTool(renderers=[], tooltips=self.tooltips) plot = bokeh.plotting.figure( frame_width=frame_width, frame_height=frame_height, tools=[self.bokeh_models["hover_tool"]], match_aspect=True, title=title, output_backend="webgl", ) sphere_map = cls(plot=plot, mjd=self.mjd) if "healpix" in self.data_sources: sphere_map.add_healpix( self.data_sources["healpix"], cmap=self.healpix_cmap, nside=self.nside, ) else: sphere_map.add_healpix(self.healpix_values, nside=self.nside) self.data_sources["healpix"] = sphere_map.plot.select(name="hpix_ds")[0] self.healpix_cmap = sphere_map.healpix_cmap healpix_renderer = sphere_map.plot.select(name="hpix_renderer")[0] self.bokeh_models["hover_tool"].renderers.append(healpix_renderer) if "horizon" in self.data_sources: sphere_map.add_horizon(data_source=self.data_sources["horizon"]) else: self.data_sources["horizon"] = sphere_map.add_horizon() if "zd70" in self.data_sources: sphere_map.add_horizon( zd=70, data_source=self.data_sources["zd70"], line_kwargs={"color": "red", "line_width": 2}, ) else: self.data_sources["zd70"] = sphere_map.add_horizon( zd=70, line_kwargs={"color": "red", "line_width": 2} ) if horizon_graticules: sphere_map.add_horizon_graticules() if decorate: sphere_map.decorate() if "survey_marker" not in self.data_sources: self.data_sources["survey_marker"] = self.make_survey_marker_data_source(sphere_map) sphere_map.add_marker( data_source=self.data_sources["survey_marker"], name="Field", circle_kwargs={"color": "red", "fill_alpha": 0.5}, ) if "telescope_marker" not in self.data_sources: self.data_sources["telescope_marker"] = self.make_telescope_marker_data_source(sphere_map) sphere_map.add_marker( data_source=self.data_sources["telescope_marker"], name="Field", circle_kwargs={"color": "green", "fill_alpha": 0.5}, ) if "moon_marker" not in self.data_sources: self.data_sources["moon_marker"] = self.make_moon_marker_data_source(sphere_map) sphere_map.add_marker( data_source=self.data_sources["moon_marker"], name="Moon", circle_kwargs={"color": "lightgray", "fill_alpha": 0.8}, ) if "sun_marker" not in self.data_sources: self.data_sources["sun_marker"] = self.make_sun_marker_data_source(sphere_map) sphere_map.add_marker( data_source=self.data_sources["sun_marker"], name="Sun", circle_kwargs={"color": "yellow", "fill_alpha": 1}, ) self.bokeh_models[key] = plot self.sphere_maps[key] = sphere_map def _make_marker_data_source( self, sphere_map=None, name="telescope", source_name="conditions", ra_name="telRA", decl_name="telDec", source_units="radians", ): """Create a bokeh datasource for an object at a set of coordinates. Parameters ---------- sphere_map: `schedview.plot.SphereMap` The instance of SphereMap to use to create the data source name : 'str' The name of the thing to mark. source_name : `str` The name of the member object to provide the coordinates. ra_name : `str` The name of the member with the RA. decl_name : `str` The name of the member with the declination. source_units : `str` 'radians' or 'degrees', according to what is provided by the source Returns ------- data_source: `bokeh.models.ColumnDataSource` The DataSource with the column data. """ if sphere_map is None: sphere_map = tuple(self.sphere_maps.values())[0] sources = { "conditions": self.conditions, "survey": self.scheduler.survey_lists[self.survey_index[0]][self.survey_index[1]], } source = sources[source_name] # If the telescope position is not set in our instance of # conditions, use an empty array ra = getattr(source, ra_name, np.array([])) decl = getattr(source, decl_name, np.array([])) if ra is None: ra = np.array([]) if decl is None: decl = np.array([]) LOGGER.debug(f"{name.capitalize()} coordinates: ra={np.degrees(ra)}, decl={np.degrees(decl)}") if source_units == "radians": ra_deg = np.degrees(ra) decl_deg = np.degrees(decl) elif source_units in ("degrees", "deg"): ra_deg = ra decl_deg = decl data = { "ra": ra_deg.tolist(), "decl": decl_deg.tolist(), "name": [name] * len(ra_deg), "glyph_size": [20] * len(ra_deg), } data_source = bokeh.models.ColumnDataSource(data=data, name=name) sphere_map.connect_controls(data_source) return data_source def make_moon_marker_data_source(self, sphere_map=None): """Create a bokeh datasource for the moon. Parameters ---------- sphere_map: `schedview.plot.SphereMap` The instance of SphereMap to use to create the data source Returns ------- data_source: `bokeh.models.ColumnDataSource` The DataSource with the column data. """ data_source = self._make_marker_data_source( sphere_map=sphere_map, name="moon", source_name="conditions", ra_name="moonRA", decl_name="moonDec", source_units="radians", ) return data_source def update_moon_marker_bokeh_model(self): """Update the moon data source.""" if "telescope_marker" not in self.data_sources: return sphere_map = tuple(self.sphere_maps.values())[0] data_source = self.make_moon_marker_data_source(sphere_map) data = dict(data_source.data) if "moon_marker" in self.data_sources: self.data_sources["moon_marker"].data = data def make_sun_marker_data_source(self, sphere_map=None): """Create a bokeh datasource for the sun. Parameters ---------- sphere_map: `schedview.plot.SphereMap` The instance of SphereMap to use to create the data source Returns ------- data_source: `bokeh.models.ColumnDataSource` The DataSource with the column data. """ data_source = self._make_marker_data_source( sphere_map=sphere_map, name="sun", source_name="conditions", ra_name="sunRA", decl_name="sunDec", source_units="radians", ) return data_source def update_sun_marker_bokeh_model(self): """Update the sun data source.""" if "telescope_marker" not in self.data_sources: return sphere_map = tuple(self.sphere_maps.values())[0] data_source = self.make_sun_marker_data_source(sphere_map) data = dict(data_source.data) if "sun_marker" in self.data_sources: self.data_sources["sun_marker"].data = data def make_telescope_marker_data_source(self, sphere_map=None): """Create a bokeh datasource for the current telescope pointing. Parameters ---------- sphere_map: `schedview.plot.SphereMap` The instance of SphereMap to use to create the data source Returns ------- data_source: `bokeh.models.ColumnDataSource` The DataSource with the column data. """ data_source = self._make_marker_data_source( sphere_map=sphere_map, name="telescope", source_name="conditions", ra_name="telRA", decl_name="telDec", source_units="radians", ) return data_source def update_telescope_marker_bokeh_model(self): """Update the telescope pointing data source.""" if "telescope_marker" not in self.data_sources: return sphere_map = tuple(self.sphere_maps.values())[0] data_source = self.make_telescope_marker_data_source(sphere_map) data = dict(data_source.data) if "telescope_marker" in self.data_sources: self.data_sources["telescope_marker"].data = data def make_survey_marker_data_source(self, sphere_map=None, max_fields=50): """Create a bokeh datasource for the pointings for the current survey. Parameters ---------- sphere_map: `schedview.plot.SphereMap` The instance of SphereMap to use to create the data source max_fields: `int` Maximum number of fields to display (none shown if the scheduler has more fields.) Returns ------- data_source: `bokeh.models.ColumnDataSource` The DataSource with the column data. """ survey = self.scheduler.survey_lists[self.survey_index[0]][self.survey_index[1]] try: ra_name = "ra" if len(survey.ra) <= max_fields else "" decl_name = "dec" if len(survey.dec) <= max_fields else "" except AttributeError: ra_name = "" decl_name = "" data_source = self._make_marker_data_source( sphere_map=sphere_map, name="Field", source_name="survey", ra_name=ra_name, decl_name=decl_name, source_units="radians", ) return data_source def update_survey_marker_bokeh_model(self): """Update the survey pointing data source.""" if "survey_marker" not in self.data_sources: return sphere_map = tuple(self.sphere_maps.values())[0] data_source = self.make_survey_marker_data_source(sphere_map) data = dict(data_source.data) if "survey_marker" in self.data_sources: self.data_sources["survey_marker"].data = data def update_healpix_bokeh_model(self): """Update the healpix value data source.""" if "healpix" not in self.data_sources: return LOGGER.debug("Updating helpix bokeh models") sphere_map = tuple(self.sphere_maps.values())[0] # sphere_map = ArmillarySphere(mjd=self.conditions.mjd) if "Zenith_shadow_mask" in self.map_keys: zenith_mask = self.healpix_maps["Zenith_shadow_mask"] cmap_sample_data = self.healpix_values[zenith_mask == 1] elif "y_sky" in self.map_keys: sb_mask = np.isfinite(self.healpix_maps["y_sky"]) cmap_sample_data = self.healpix_values[sb_mask] if len(cmap_sample_data) == 0: # It's probably day, so the color map will be bad regardless. cmap_sample_data = self.healpix_values else: cmap_sample_data = self.healpix_values self.healpix_cmap = make_zscale_linear_cmap(cmap_sample_data) new_ds = sphere_map.make_healpix_data_source( self.healpix_values, nside=self.nside, bound_step=1, ) new_data = dict(new_ds.data) for key in self.map_keys: # The datasource might not have all healpixels # or have them in the same order # so force the order by indexing on new_data["hpid"] new_data[key] = self.healpix_maps[key][new_data["hpid"]] # Replace the data to be shown self.data_sources["healpix"].data = new_data for sphere_map in self.sphere_maps.values(): sphere_map.healpix_glyph.fill_color = self.healpix_cmap sphere_map.healpix_glyph.line_color = self.healpix_cmap def update_hovertool_bokeh_model(self): """Update the hovertool with available value.""" if "hover_tool" not in self.bokeh_models: return tooltips = [] data = self.data_sources["healpix"].data for data_key in data.keys(): if not isinstance(data[data_key][0], collections.abc.Sequence): if data_key == "center_ra": label = "RA" elif data_key == "center_decl": label = "Decl" else: label = data_key.replace("_", " ") reference = f"@{data_key}" tooltips.append((label, reference)) self.bokeh_models["hover_tool"].tooltips = tooltips def make_reward_table(self): """Create the bokeh model for a table of rewards.""" # Bokeh's DataTable doesn't like to expand to accommodate extra rows, # so create a dummy with lots of rows initially. df = pd.DataFrame( np.nan, index=range(30), columns=[ "basis_function", "feasible", "max_basis_reward", "max_accum_reward", ], ) self.bokeh_models["reward_table"] = bokeh.models.DataTable( source=bokeh.models.ColumnDataSource(df), columns=[bokeh.models.TableColumn(field=c, title=c) for c in df], ) def update_reward_table_bokeh_model(self): """Update the bokeh model for the table of rewards.""" if "reward_table" in self.bokeh_models: survey = self.scheduler.survey_lists[self.survey_index[0]][self.survey_index[1]] reward_df = make_survey_reward_df(survey, self.conditions) any_bad_urls = False for doc_url in reward_df["doc_url"].values: if doc_url is None or "http" not in doc_url: any_bad_urls = True break if any_bad_urls: basis_function_formatter = bokeh.models.widgets.HTMLTemplateFormatter( template="Not a basis real function" ) else: basis_function_formatter = bokeh.models.widgets.HTMLTemplateFormatter( template='<a href="<%= doc_url %>" target="_blank"><%= value %></a>' ) self.bokeh_models["reward_table"].source = bokeh.models.ColumnDataSource(reward_df) new_columns = [] for column_name in reward_df.columns: if column_name == "basis_function": new_column = bokeh.models.TableColumn( field=column_name, title=column_name, formatter=basis_function_formatter, ) elif column_name == "doc_url": continue else: new_column = bokeh.models.TableColumn(field=column_name, title=column_name) new_columns.append(new_column) self.bokeh_models["reward_table"].columns = new_columns def make_status_indicator(self): """Create an indicator of what the visualization app is doing.""" self.bokeh_models["status_indicator"] = bokeh.models.Div(text="<p></p>") def make_chosen_survey(self): """Create the bokeh model for text showing the chosen survey.""" self.bokeh_models["chosen_survey"] = bokeh.models.Div(text="<p>No chosen survey</p>") def update_chosen_survey_bokeh_model(self): """Update the bokeh model for text showing the chosen survey.""" if ( "chosen_survey" in self.bokeh_models and len(self.scheduler.survey_index) == 2 and self.scheduler.survey_index[0] is not None and self.scheduler.survey_index[1] is not None ): tier = f"tier {self.scheduler.survey_index[0]}" survey = self._unique_survey_name() self.bokeh_models["chosen_survey"].text = f"<p>Chosen survey: {tier}, {survey}</p>" def make_displayed_value_metadata(self): """Create the bokeh model specifying what values are displayed.""" self.bokeh_models["displayed_value_metadata"] = bokeh.models.Div(text="<p>No displayed values</p>") def update_displayed_value_metadata_bokeh_model(self): """Update the bokeh model specifying what values are displayed.""" if "displayed_value_metadata" in self.bokeh_models: tier = f"tier {self.survey_index[0]}" survey = self._unique_survey_name() self.bokeh_models["displayed_value_metadata"].text = ( f"<p>Displayed value: {self.map_key} from {tier}, {survey}</p>" ) def make_time_display(self): """Create the bokeh model showing what time is being represented.""" self.bokeh_models["time_display"] = bokeh.models.Div(text="<p>No time.</p>") def update_time_display_bokeh_model(self): """Update the bokeh model showing what time is being represented.""" iso_time = Time(self.mjd, format="mjd", scale="utc").iso if "time_display" in self.bokeh_models: self.bokeh_models["time_display"].text = f"<p>{iso_time}</p>" def make_scheduler_summary_df(self): """Summarize the reward from each scheduler Returns ------- survey_df : `pandas.DataFrame` A table showing the reword for each feasible survey, and the basis functions that result in it being infeasible for the rest. """ survey_df = make_scheduler_summary_df(self.scheduler, self.conditions) return survey_df def make_reward_summary_table(self): """Create the bokeh model of the table of rewards.""" # Bokeh's DataTable doesn't like to expand to accommodate extra rows, # so create a dummy with lots of rows initially. df = pd.DataFrame( np.nan, index=range(300), columns=["tier", "survey_name", "reward"], ) self.data_sources["reward_summary_table"] = bokeh.models.ColumnDataSource(df) self.bokeh_models["reward_summary_table"] = bokeh.models.DataTable( source=self.data_sources["reward_summary_table"], columns=[bokeh.models.TableColumn(field=c, title=c) for c in df], ) self.update_reward_summary_table_bokeh_model() def update_reward_summary_table_bokeh_model(self): """Update the bokeh model of the table of rewards.""" LOGGER.info("Updating reward summary table bokeh model") if "reward_summary_table" in self.bokeh_models: scheduler_summary_df = self.make_scheduler_summary_df() def to_sigfig(x): try: value = float(x) except ValueError: return x return float("{:.5g}".format(value)) scheduler_summary_df["reward"] = scheduler_summary_df["reward"].apply(to_sigfig) # Get URLs for survey documentation # Flatten the list of lists of surveys into one long list surveys = itertools.chain.from_iterable(self.scheduler.survey_lists) survey_class_names = [ "rubin_scheduler.scheduler.surveys." + s.__class__.__name__ for s in surveys ] survey_doc_url = [f"https://rubin-sim.lsst.io/api/{cn}.html#{cn}" for cn in survey_class_names] survey_name_formatter = bokeh.models.widgets.HTMLTemplateFormatter( template='<a href="<%= doc_url %>" target="_blank"><%= value %></a>' ) scheduler_summary_df["doc_url"] = survey_doc_url self.bokeh_models["reward_summary_table"].source.data = dict( bokeh.models.ColumnDataSource(scheduler_summary_df).data ) new_columns = [] for column_name in scheduler_summary_df.columns: if column_name == "survey_name": new_column = bokeh.models.TableColumn( field=column_name, title=column_name, formatter=survey_name_formatter, ) elif column_name == "doc_url": continue else: new_column = bokeh.models.TableColumn(field=column_name, title=column_name) new_columns.append(new_column) self.bokeh_models["reward_summary_table"].columns = new_columns def make_figure(self): """Create a bokeh figures showing sky maps for scheduler behavior. Returns ------- fig : `bokeh.models.layouts.LayoutDOM` A bokeh figure that can be displayed in a notebook (e.g. with ``bokeh.io.show``) or used to create a bokeh app. """ self.make_sphere_map( "armillary_sphere", ArmillarySphere, "Armillary Sphere", frame_width=512, frame_height=512, decorate=True, ) self.bokeh_models["alt_slider"] = self.sphere_maps["armillary_sphere"].sliders["alt"] self.bokeh_models["az_slider"] = self.sphere_maps["armillary_sphere"].sliders["az"] self.bokeh_models["mjd_slider"] = self.sphere_maps["armillary_sphere"].sliders["mjd"] self.bokeh_models["mjd_slider"].visible = False self.make_sphere_map( "planisphere", Planisphere, "Planisphere", frame_width=512, frame_height=512, decorate=True, ) self.make_sphere_map( "altaz", HorizonMap, "Alt Az", frame_width=512, frame_height=512, decorate=False, horizon_graticules=True, ) self.make_sphere_map( "mollweide", MollweideMap, "Mollweide", frame_width=512, frame_height=512, decorate=True, ) self.bokeh_models["key"] = bokeh.models.Div(text=self.key_markup) self.make_time_display() self.make_displayed_value_metadata() self.bokeh_models["reward_table_title"] = bokeh.models.Div( text="<h2>Basis functions for displayed survey</h2>" ) self.make_reward_table() self.bokeh_models["reward_summary_table_title"] = bokeh.models.Div( text="<h2>Rewards for all survey schedulers</h2>" ) self.make_reward_summary_table() self.make_chosen_survey() arm_controls = [ self.bokeh_models["alt_slider"], self.bokeh_models["az_slider"], ] figure = bokeh.layouts.row( bokeh.layouts.column( self.bokeh_models["altaz"], self.bokeh_models["time_display"], self.bokeh_models["displayed_value_metadata"], self.bokeh_models["chosen_survey"], self.bokeh_models["reward_table_title"], self.bokeh_models["reward_table"], self.bokeh_models["reward_summary_table_title"], self.bokeh_models["reward_summary_table"], ), bokeh.layouts.column( self.bokeh_models["planisphere"], self.bokeh_models["key"], self.bokeh_models["mollweide"], self.bokeh_models["armillary_sphere"], *arm_controls, ), ) return figure def update_bokeh_models(self): """Update all bokeh models with current data.""" LOGGER.debug("Updating bokeh data models.") self.update_reward_table_bokeh_model() self.update_reward_summary_table_bokeh_model() self.update_healpix_bokeh_model() self.update_hovertool_bokeh_model() self.update_telescope_marker_bokeh_model() self.update_moon_marker_bokeh_model() self.update_sun_marker_bokeh_model() self.update_survey_marker_bokeh_model() self.update_time_display_bokeh_model() self.update_displayed_value_metadata_bokeh_model() self.update_chosen_survey_bokeh_model() @property def figure(self): """Return a figure for this display. Returns ------- figure : `bokeh.models.layouts.LayoutDOM` A bokeh figure that can be displayed in a notebook (e.g. with ``bokeh.io.show``) or used to create a bokeh app. """ if self._figure is None: self._figure = self.make_figure() return self._figure class SchedulerNotebookDisplay(SchedulerDisplay): def __init__(self, *args, **kwargs): # docstring in parent class # notebook_handle must be initialized so overridden methods # called by the parent __init__ will run. self.notebook_handle = None super().__init__(*args, **kwargs) def _set_conditions(self, conditions): super()._set_conditions(conditions) self.update_display() def select_tier(self, tier): # docstring in parent class super().select_tier(tier) self.update_display() def select_survey(self, survey): # docstring in parent class super().select_survey(survey) self.update_display() def select_value(self, value): # docstring in parent class super().select_value(value) self.update_display() def update_display(self): """Update the display.""" if self.notebook_handle is not None: self.update_bokeh_models() bokeh.io.push_notebook(handle=self.notebook_handle) def show(self): """Show the display.""" self.notebook_handle = bokeh.io.show(self.figure, notebook_handle=True)