import bokeh
import healpy as hp
import numpy as np
import pandas as pd
from astropy.time import Time
# Imported to help sphinx make the link
from rubin_scheduler.scheduler.model_observatory.model_observatory import ModelObservatory
from rubin_scheduler.scheduler.schedulers import CoreScheduler # noqa F401
from uranography.api import ArmillarySphere, Planisphere, split_healpix_by_resolution
import schedview.collect.scheduler_pickle
import schedview.compute.astro
from schedview.collect.stars import load_bright_stars
from schedview.compute.camera import LsstCameraFootprintPerimeter
BAND_COLORS = dict(u="#56b4e9", g="#008060", r="#ff4000", i="#850000", z="#6600cc", y="#222222")
BAND_HATCH_PATTERNS = dict(
u="dot",
g="ring",
r="horizontal_line",
i="vertical_line",
z="right_diagonal_line",
y="left_diagonal_line",
)
BAND_HATCH_SCALES = dict(u=6, g=6, r=6, i=6, z=12, y=12)
NSIDE_LOW = 8
[docs]
def plot_visit_skymaps(
visits,
footprint,
conditions,
hatch=False,
fade_scale=2.0 / (24 * 60),
camera_perimeter="LSST",
nside_low=8,
show_stars=False,
):
"""Plot visits on a map of the sky.
Parameters
----------
visits : `pandas.DataFrame`
One row per visit, with at least the following columns:
``"fieldRA"``
The visit R.A. in degrees (`float`).
``"fieldDec"``
The visit declination in degrees (`float`).
``"observationStartMJD"``
The visit start MJD (`float`).
``"filter"``
The visit filter (`str`)
footprint : `numpy.array`
A healpix map of the footprint.
conditions : `rubin_scheduler.scheduler.features.conditions.Conditions`
The conditions for the night, which determines the start and end
times covered by the map.
hatch : `bool`
Use hatches instead of filling visit polygons. (SLOW!)
fade_scale : `float`
Time (in days) over which visit outlines fade.
camera_perimeter : `str` or `object`
An function that returns the perimeter of the camera footprint,
or "LSST" for the LSST camera footprint.
Defaults to "LSST".
nside_low : `int`
The healpix nside to try to use for low resolution sections of the
healpix map.
show_stars : `bool`
Show stars? (Defaults to False)
Returns
-------
_type_
_description_
"""
if camera_perimeter == "LSST":
camera_perimeter = LsstCameraFootprintPerimeter()
nside_low = NSIDE_LOW
asphere = ArmillarySphere(mjd=conditions.mjd)
psphere = Planisphere(mjd=conditions.mjd)
psphere.sliders["mjd"] = asphere.sliders["mjd"]
cmap = bokeh.transform.linear_cmap("value", "Greys256", int(np.ceil(np.nanmax(footprint) * 2)), 0)
nside_high = hp.npix2nside(footprint.shape[0])
footprint_high, footprint_low = split_healpix_by_resolution(footprint, nside_low, nside_high)
healpix_high_ds, cmap, glyph = asphere.add_healpix(footprint_high, nside=nside_high, cmap=cmap)
psphere.add_healpix(healpix_high_ds, nside=nside_high, cmap=cmap)
healpix_low_ds, cmap, glyph = asphere.add_healpix(footprint_low, nside=nside_low, cmap=cmap)
psphere.add_healpix(healpix_low_ds, nside=nside_low, cmap=cmap)
# Transforms for recent, past, future visits
past_future_js = """
const result = new Array(xs.length)
for (let i = 0; i < xs.length; i++) {
if (mjd_slider.value >= xs[i]) {
result[i] = past_value
} else {
result[i] = future_value
}
}
return result
"""
past_future_transform = bokeh.models.CustomJSTransform(
args=dict(mjd_slider=asphere.sliders["mjd"], past_value=0.5, future_value=0.0),
v_func=past_future_js,
)
recent_js = """
const result = new Array(xs.length)
for (let i = 0; i < xs.length; i++) {
if (mjd_slider.value < xs[i]) {
result[i] = 0
} else {
result[i] = Math.max(0, max_value * (1 - (mjd_slider.value - xs[i]) / scale))
}
}
return result
"""
recent_transform = bokeh.models.CustomJSTransform(
args=dict(mjd_slider=asphere.sliders["mjd"], max_value=1.0, scale=fade_scale),
v_func=recent_js,
)
for band in "ugrizy":
band_visits = visits.query(f"filter == '{band}'")
if len(band_visits) < 1:
continue
ras, decls = camera_perimeter(band_visits.fieldRA, band_visits.fieldDec, band_visits.rotSkyPos)
perimeter_df = pd.DataFrame(
{
"ra": ras,
"decl": decls,
"mjd": band_visits.observationStartMJD.values,
}
)
patches_kwargs = dict(
fill_alpha=bokeh.transform.transform("mjd", past_future_transform),
line_alpha=bokeh.transform.transform("mjd", recent_transform),
line_color="#ff00ff",
line_width=2,
)
if hatch:
patches_kwargs.update(
dict(
fill_alpha=0,
fill_color=None,
hatch_alpha=bokeh.transform.transform("mjd", past_future_transform),
hatch_color=BAND_COLORS[band],
hatch_pattern=BAND_HATCH_PATTERNS[band],
hatch_scale=BAND_HATCH_SCALES[band],
)
)
else:
patches_kwargs.update(dict(fill_color=BAND_COLORS[band]))
visit_ds = asphere.add_patches(
perimeter_df,
patches_kwargs=patches_kwargs,
)
psphere.add_patches(data_source=visit_ds, patches_kwargs=patches_kwargs)
asphere.decorate()
psphere.decorate()
sun_ds = asphere.add_marker(
ra=np.degrees(conditions.sun_ra),
decl=np.degrees(conditions.sun_dec),
name="Sun",
glyph_size=15,
circle_kwargs={"color": "yellow", "fill_alpha": 1},
)
psphere.add_marker(
data_source=sun_ds,
name="Sun",
glyph_size=15,
circle_kwargs={"color": "yellow", "fill_alpha": 1},
)
moon_ds = asphere.add_marker(
ra=np.degrees(conditions.moon_ra),
decl=np.degrees(conditions.moon_dec),
name="Moon",
glyph_size=15,
circle_kwargs={"color": "orange", "fill_alpha": 0.8},
)
psphere.add_marker(
data_source=moon_ds,
name="Moon",
glyph_size=15,
circle_kwargs={"color": "orange", "fill_alpha": 0.8},
)
if show_stars:
star_data = load_bright_stars().loc[:, ["name", "ra", "decl", "Vmag"]]
star_data["glyph_size"] = 15 - (15.0 / 3.5) * star_data["Vmag"]
star_data.query("glyph_size>0", inplace=True)
star_ds = asphere.add_stars(star_data, mag_limit_slider=False, star_kwargs={"color": "yellow"})
psphere.add_stars(
star_data,
data_source=star_ds,
mag_limit_slider=True,
star_kwargs={"color": "yellow"},
)
horizon_ds = asphere.add_horizon()
psphere.add_horizon(data_source=horizon_ds)
horizon70_ds = asphere.add_horizon(zd=70, line_kwargs={"color": "red", "line_width": 2})
psphere.add_horizon(data_source=horizon70_ds, line_kwargs={"color": "red", "line_width": 2})
asphere.sliders["mjd"].start = conditions.sun_n12_setting
asphere.sliders["mjd"].end = conditions.sun_n12_rising
fig = bokeh.layouts.row(
asphere.figure,
psphere.figure,
)
return fig
[docs]
def plot_visit_planisphere(
visits,
footprint,
conditions,
hatch=False,
fade_scale=2.0 / (24 * 60),
camera_perimeter="LSST",
nside_low=8,
show_stars=False,
):
"""Plot visits on a map of the sky.
Parameters
----------
visits : `pandas.DataFrame`
One row per visit, with at least the following columns:
``"fieldRA"``
The visit R.A. in degrees (`float`).
``"fieldDec"``
The visit declination in degrees (`float`).
``"observationStartMJD"``
The visit start MJD (`float`).
``"filter"``
The visit filter (`str`)
footprint : `numpy.array`
A healpix map of the footprint.
conditions : `rubin_scheduler.scheduler.features.conditions.Conditions`
The conditions for the night, which determines the start and end
times covered by the map.
hatch : `bool`
Use hatches instead of filling visit polygons. (SLOW!)
fade_scale : `float`
Time (in days) over which visit outlines fade.
camera_perimeter : `str` or `object`
An function that returns the perimeter of the camera footprint,
or "LSST" for the LSST camera footprint.
Defaults to "LSST".
nside_low : `int`
The healpix nside to try to use for low resolution sections of the
healpix map.
show_stars : `bool`
Show stars? (Defaults to False)
Returns
-------
_type_
_description_
"""
if camera_perimeter == "LSST":
camera_perimeter = LsstCameraFootprintPerimeter()
nside_low = NSIDE_LOW
plot = bokeh.plotting.figure(
frame_width=1024,
frame_height=1024,
match_aspect=True,
title="Visits",
)
psphere = Planisphere(mjd=conditions.mjd, plot=plot)
psphere.add_mjd_slider()
cmap = bokeh.transform.linear_cmap("value", "Greys256", int(np.ceil(np.nanmax(footprint) * 2)), 0)
nside_high = hp.npix2nside(footprint.shape[0])
footprint_high, footprint_low = split_healpix_by_resolution(footprint, nside_low, nside_high)
healpix_high_ds, cmap, glyph = psphere.add_healpix(footprint_high, nside=nside_high, cmap=cmap)
healpix_low_ds, cmap, glyph = psphere.add_healpix(footprint_low, nside=nside_low, cmap=cmap)
for band in "ugrizy":
band_visits = visits.query(f"filter == '{band}'")
if len(band_visits) < 1:
continue
ras, decls = camera_perimeter(band_visits.fieldRA, band_visits.fieldDec, band_visits.rotSkyPos)
mjd_start = band_visits.observationStartMJD.values
current_mjd = conditions.sun_n12_setting
perimeter_df = pd.DataFrame(
{
"ra": ras,
"decl": decls,
"min_mjd": band_visits.observationStartMJD.values,
"in_mjd_window": [0.3] * len(ras),
"fade_scale": [fade_scale] * len(ras),
"recent_mjd": np.clip((current_mjd - mjd_start) / fade_scale, 0, 1),
}
)
patches_kwargs = dict(line_alpha="recent_mjd", line_color="#ff00ff", line_width=2)
if hatch:
patches_kwargs.update(
dict(
fill_alpha=0,
fill_color=None,
hatch_alpha="in_mjd_window",
hatch_color=BAND_COLORS[band],
hatch_pattern=BAND_HATCH_PATTERNS[band],
hatch_scale=BAND_HATCH_SCALES[band],
)
)
else:
patches_kwargs.update(
dict(
fill_alpha="in_mjd_window",
fill_color=BAND_COLORS[band],
)
)
visit_ds = psphere.add_patches(
perimeter_df,
patches_kwargs=patches_kwargs,
)
psphere.set_js_update_func(visit_ds)
psphere.decorate()
psphere.add_marker(
ra=np.degrees(conditions.sun_ra),
decl=np.degrees(conditions.sun_dec),
name="Sun",
glyph_size=15,
circle_kwargs={"color": "yellow", "fill_alpha": 1},
)
psphere.add_marker(
ra=np.degrees(conditions.moon_ra),
decl=np.degrees(conditions.moon_dec),
name="Moon",
glyph_size=15,
circle_kwargs={"color": "orange", "fill_alpha": 0.8},
)
if show_stars:
star_data = load_bright_stars().loc[:, ["name", "ra", "decl", "Vmag"]]
star_data["glyph_size"] = 15 - (15.0 / 3.5) * star_data["Vmag"]
star_data.query("glyph_size>0", inplace=True)
psphere.add_stars(star_data, mag_limit_slider=False, star_kwargs={"color": "yellow"})
psphere.add_horizon()
psphere.add_horizon(zd=70, line_kwargs={"color": "red", "line_width": 2})
psphere.sliders["mjd"].start = conditions.sun_n12_setting
psphere.sliders["mjd"].end = conditions.sun_n12_rising
fig = bokeh.layouts.row(
psphere.figure,
)
return fig
[docs]
def create_visit_skymaps(
visits,
night_date,
nside=32,
observatory=None,
timezone="Chile/Continental",
planisphere_only=False,
):
"""Create a map of visits on the sky.
Parameters
----------
visits : `pandas.DataFrame` or `str`
If a `pandas.DataFrame`, it needs at least the following columns:
``"fieldRA"``
The visit R.A. in degrees (`float`).
``"fieldDec"``
The visit declination in degrees (`float`).
``"observationStartMJD"``
The visit start MJD (`float`).
``"filter"``
The visit filter (`str`)
If a string, the file name of the opsim database from which the
visits should be loaded.
night_date : `datetime.date`
The calendar date of the evening of the night for which
to plot the visits.
nside : `int`, optional
The healpix nside to use for the map.
observatory : `ModelObservatory`, optional
Provides the location of the observatory, used to compute
night start and end times.
By default None.
timezone : `str`, optional
by default "Chile/Continental"
planisphere_only : `bool`
by default False
Returns
-------
figure : `bokeh.models.layouts.LayoutDOM`
The figure itself.
data : `dict`
The arguments used to produce the figure using
`plot_visit_skymaps`.
"""
site = None if observatory is None else observatory.location
night_events = schedview.compute.astro.night_events(night_date=night_date, site=site, timezone=timezone)
start_time = Time(night_events.loc["sunset", "UTC"])
end_time = Time(night_events.loc["sunrise", "UTC"])
if isinstance(visits, str):
visits = schedview.collect.opsim.read_opsim(visits)
if start_time is not None:
visits.query(f"observationStartMJD >= {Time(start_time).mjd}", inplace=True)
if end_time is not None:
visits.query(f"observationStartMJD <= {Time(end_time).mjd}", inplace=True)
if observatory is None:
observatory = ModelObservatory(nside=nside, init_load_length=1)
observatory.sky_model.load_length = 1
footprint = schedview.collect.footprint.get_footprint(nside)
observatory.mjd = end_time.mjd
conditions = observatory.return_conditions()
data = {"visits": visits, "footprint": footprint, "conditions": conditions}
if planisphere_only:
vmap = schedview.plot.visitmap.plot_visit_planisphere(**data)
else:
vmap = schedview.plot.visitmap.plot_visit_skymaps(**data)
return vmap, data