Source code for schedview.collect.logdb
import requests
import schedview.clientsite
from schedview.collect.auth import get_auth
MAX_RETRIES = 2
[docs]
def get_from_logdb_with_retries(channel: str, params: dict) -> list[dict]:
"""Retrieve log messages, with retries.
Parameters
----------
channel : `str`
The channel from which to retrieve log messages.
params : `dict`
Parameters passed to the REST URI.
Returns
-------
result: `list[dict]`
The log messages.
"""
try:
# Note that get_auth is cached, so it does not actually read the
# token every time.
auth = get_auth()
except ValueError:
auth = ("user", None)
api_endpoint = f"{schedview.clientsite.DATASOURCE_BASE_URL}{channel}"
response = requests.get(api_endpoint, auth=auth, params=params)
try_number = 1
while not response.status_code == 200:
if try_number > MAX_RETRIES:
response.raise_for_status()
response = requests.get(api_endpoint, auth=auth, params=params)
try_number += 1
return response.json()