diff --git a/.gitignore b/.gitignore index 4d1af89..867e86a 100644 --- a/.gitignore +++ b/.gitignore @@ -167,6 +167,7 @@ cython_debug/ # local developer code dev_*.py +dev_*/ # Qt (on Windows) gemviz/resources/*.py diff --git a/gemviz/README.md b/gemviz/README.md new file mode 100644 index 0000000..f56a21d --- /dev/null +++ b/gemviz/README.md @@ -0,0 +1,40 @@ +# notes + +Terse summary of responsibilities of the source modules. + +- app + - parse command-line options + - setup logging + - starts GUI (MainWindow) +- MainWindow class + - creates tiled client connection + - passes client to BRC_MVC class + - populate QComboBox with available catalog names + - When catalog is selected from QComboBox, create BRC_MVC. +- BRC_MVC class + - (re)created each time catalog is chosen: + - set self.catalog + - remove previous QTableView + - create QTableView with default page offset and size + - each time filters are changed or catalog is updated: + - Set page offset from self.selected_uid (if not None) + - update QTableView with fcat_md (page of filtered catalog metadata) + - call SelectFieldsWidget when run is selected + - call ChartView when requested by SelectFieldsWidget + - BRCTableView class + - page through a filtered CatalogOfBlueskyRuns + - update BRCTableModel as page parameters or filtered catalog are changed + - BRCTableModel class + - display one page of runs in a table + - select a run for examination + - when run is selected, update self.selected_uid (emit a signal) + - SelectFieldsWidget class + - populate the table of plottable fields + - identify default data to plot (if any) and check the boxes + - request to plot data as directed by buttons + - ChartView class + - Determine dimensionality (and type of plot) + - Only plots 1-D line charts now. + - TODO: Expand to 2-D mesh and 2-D image views. + +... diff --git a/gemviz/analyze_run.py b/gemviz/analyze_run.py deleted file mode 100644 index 5da553e..0000000 --- a/gemviz/analyze_run.py +++ /dev/null @@ -1,275 +0,0 @@ -""" -Analyze a tiled run for its plottable data. - -.. autosummary:: - - ~SignalAxesFields -""" - -import logging -import warnings - -from . import tapi - -logger = logging.getLogger(__name__) -DEFAULT_STREAM = "primary" - - -class SignalAxesFields: - """ - Identify the signal and axes data fields from the run. - - .. autosummary:: - - ~to_dict - ~descriptors - ~hints - ~identify_axes - ~identify_detectors - ~identify_fields - ~identify_chart - ~object_names - ~object_name_to_fields - """ - - # runs with these exit_status probably have plottable data fields - status__with_data = """ - abort - success - """.split() - - # do not choose any of these fields as the default (NeXus-style plottable) signal data. - not_signals = """ - timebase - preset_time - """.split() - - def __init__(self, run, default_stream=DEFAULT_STREAM) -> None: - self.run = run - - self._cleanup_motor_heuristic = False - self.plot_axes = [] - self.plot_signal = None - self.positioners = [] - self.stream_name = default_stream - self.fields = [] - self.detectors = [] - self.chart_type = None - - self._descriptors = None - - self.plan_name = tapi.get_md(run, "start", "plan_name") - self.scan_id = tapi.get_md(run, "start", "scan_id") - self.status = tapi.get_md(run, "stop", "exit_status") - self.time = tapi.get_md(run, "start", "time") - self.title = tapi.get_md(run, "start", "title") - self.uid = tapi.get_md(run, "start", "uid") - - if self.status in self.status__with_data: - self.identify_axes() # call first, redefines self.stream_name - - self.identify_detectors() - self.identify_fields() - self.identify_chart() - - def __repr__(self) -> str: - s = ( - f"uid7:{self.uid[:7]}" - f" ScanID:{self.scan_id}" - f" plan:{self.plan_name}" - f" status:{self.status}" - f" title:{self.title!r}" - ) - if self.plot_signal is not None: - s += ( - f" stream:{self.stream_name}" - f" signal:{self.plot_signal}" - f" axes:{self.plot_axes}" - f" detectors:{self.detectors}" - f" fields:{self.fields}" - ) - # fmt: off - if ( - ( - self.plot_signal is None - or self.positioners != self.plot_axes - ) - and len(self.positioners) - ): - s += f" all_dim_fields:{self.positioners}" - return s - # fmt: on - - def descriptors(self, stream=None, use_cache=True): - """Return the list of descriptor documents.""" - if use_cache and self._descriptors is not None: - # optimize slow process by cacheing - return self._descriptors - - # (re)discover the descriptors for this stream and cache them - run_stream = self.run.get(stream or self.stream_name) - if run_stream is None: - return [] # nothing that is plottable - - self._descriptors = run_stream.metadata.get("descriptors") or [] - return self._descriptors - - def hints(self, stream=None): - """Return the hints for this stream.""" - hints = {} - for descriptor in self.descriptors(stream=stream): - hints.update(descriptor.get("hints", {})) - return hints - - def identify_axes(self) -> None: - """Discover the motor (independent axis) fields.""" - hints = tapi.get_md(self.run, "start", "hints", {}) - motors = tapi.get_md(self.run, "start", "motors") - logger.debug("motors=%s", motors) - - # Prepare a guess about the dimensions (independent variables) in case - # we need it. - guess = ( - [(["time"], self.stream_name)] - if motors is None - else [([motor], self.stream_name) for motor in motors] - ) - logger.debug("guess=%s", guess) - - # Ues the guess if there is nothint about dimensions. - dimensions = hints.get("dimensions") - if dimensions is None: - self._cleanup_motor_heuristic = True - dimensions = guess - - # We can only cope with all the dimensions belonging to the same - # stream unless we resample. We are not doing to handle that yet. - if len(set(d[1] for d in dimensions)) != 1: - self._cleanup_motor_heuristic = True - dimensions = guess # Fall back on our GUESS. - warnings.warn( - "We are ignoring the dimensions hinted because we cannot combine streams." - ) - - logger.debug("dimensions=%s", dimensions) - - # for each dimension, choose one field only - # the plan can supply a list of fields. It's assumed the first - # of the list is always the one plotted against - self.plot_axes = [ - fields[0] for fields, stream_name in dimensions if len(fields) > 0 - ] - logger.debug("plot_axes=%s", self.plot_axes) - - # make distinction between flattened fields and plotted fields - # motivation for this is that when plotting, we find dependent variable - # by finding elements that are not independent variables - self.positioners = [ - field for fields, stream_name in dimensions for field in fields - ] - - _, self.stream_name = dimensions[0] - - def identify_chart(self) -> None: - """Identify the type of chart fot this run's data.""" - rank = len(self.plot_axes) - if rank == 1 and self.plot_signal is not None: - shape = self.descriptors()[0]["data_keys"][self.plot_signal]["shape"] - if len(shape) in (0, 1): - n_events = tapi.get_md(self.run, "stop", "num_events", {}) - events = n_events.get(self.stream_name) - if events > 1: - self.chart_type = "line_1D" - elif len(shape) in (2, 3): - self.chart_type = f"unknown{len(shape)}D" - elif rank == 2 and self.plot_signal is not None: - hints = tapi.get_md(self.run, "start", "hints", {}) - gridding = hints.get("gridding") - self.chart_type = "grid_2D" if gridding == "rectilinear" else "scatter_2D" - else: - self.chart_type = None - - def identify_detectors(self) -> None: - """ - Discover the list of detectors defined by the run. - - Return the fields (the names of the actual data), not the object names. - """ - - def is_numeric(detector, descriptor): - dtype = descriptor["data_keys"][detector]["dtype"] - if dtype == "array": - ntype = self.run[self.stream_name]["data"][detector].dtype.name - if ntype.startswith("int") or ntype.startswith("float"): - dtype = "number" - return dtype == "number" - - detectors = [] - for det_name in tapi.get_md(self.run, "start", "detectors", []): - detectors.extend(self.object_name_to_fields(det_name)) - - # fmt: off - self.detectors = [ - det # only numeric data - for det in detectors - for descriptor in self.descriptors(stream=self.stream_name) - if is_numeric(det, descriptor) - ] - - def identify_fields(self) -> None: - """ - Discover the data (both dependent and independent axis) fields. - - We will see if the object_names hint at whether a subset of their data - keys ("fields") are interesting. Use them if they are hinted. - Otherwise, we know that the RunEngine *always* records the complete list - of fields in each stream, so we can use them all unselectively. - """ - for obj_name in self.object_names(): - try: - fields = self.hints().get(obj_name, {})["fields"] - except KeyError: - fields = self.object_name_to_fields(obj_name) - self.fields.extend(fields) - - # identify the first plottable field, NeXus uses this, for example - names_to_avoid = self.positioners + self.not_signals - possible_signals = self.detectors + self.fields - for field in possible_signals: - if field not in names_to_avoid: - self.plot_signal = field - break - - def object_names(self, stream=None): - """Return the names of objects used in the run.""" - obj_names = [] - for descriptor in self.descriptors(stream=stream): - obj_names.extend(list(descriptor["object_keys"])) - return obj_names - - def object_name_to_fields(self, obj_name, stream=None): - """ - Return the fields for a given object name. - - The run may have recorded data identified by either the name of the - ``ophyd.Signal`` (field) or the name of the ``ophyd.Device`` (object). - """ - fields = [] - for descriptor in self.descriptors(stream=stream): - fields.extend(descriptor["object_keys"].get(obj_name, [])) - return fields - - def to_dict(self) -> dict: - """Return the essential results in a dictionary""" - return { - "scan_id": self.scan_id, - "plan": self.plan_name, - "chart_type": self.chart_type, - "stream": self.stream_name, - "rank": len(self.plot_axes), - "uid7": self.uid[:7], - "plot_signal": self.plot_signal, - "plot_axes": self.plot_axes, - "detectors": self.detectors, - "positioners": self.positioners, - } diff --git a/gemviz/bluesky_runs_catalog.py b/gemviz/bluesky_runs_catalog.py index e77070d..8b9afcc 100644 --- a/gemviz/bluesky_runs_catalog.py +++ b/gemviz/bluesky_runs_catalog.py @@ -18,6 +18,9 @@ from . import tapi from . import utils +PAGE_START = -1 +PAGE_SIZE = 10 + class BRC_MVC(QtWidgets.QWidget): """MVC class for CatalogOfBlueskyRuns.""" @@ -39,22 +42,22 @@ def setup(self): from .bluesky_runs_catalog_table_view import BRCTableView from .user_settings import settings + self.selected_run_uid = None + self.brc_search_panel = BRCSearchPanel(self) layout = self.filter_groupbox.layout() layout.addWidget(self.brc_search_panel) self.brc_search_panel.setupCatalog(self.catalogName()) - self.brc_tableview = BRCTableView(self) + self.brc_tableview = BRCTableView(self, self.catalog(), PAGE_START, PAGE_SIZE) layout = self.runs_groupbox.layout() layout.addWidget(self.brc_tableview) - self.brc_tableview.displayTable() self.brc_run_viz = BRCRunVisualization(self) layout = self.viz_groupbox.layout() layout.addWidget(self.brc_run_viz) # connect search signals with tableview update - # fmt: off widgets = [ [self.brc_search_panel.plan_name, "returnPressed"], [self.brc_search_panel.scan_id, "returnPressed"], @@ -63,9 +66,8 @@ def setup(self): [self.brc_search_panel.detectors, "returnPressed"], [self.brc_search_panel.date_time_widget.apply, "released"], ] - # fmt: on for widget, signal in widgets: - getattr(widget, signal).connect(self.brc_tableview.displayTable) + getattr(widget, signal).connect(self.refreshFilteredCatalogView) self.brc_tableview.run_selected.connect(self.doRunSelectedSlot) @@ -88,10 +90,16 @@ def doPlotSlot(self, run, stream_name, action, selections): from .select_stream_fields import to_datasets # TODO: make the plots configurable - scan_id = tapi.get_md(run, "start", "scan_id") + scan_id = run.get_run_md("start", "scan_id") # setup datasets - datasets, options = to_datasets(run[stream_name], selections, scan_id=scan_id) + try: + datasets, options = to_datasets( + run, stream_name, selections, scan_id=scan_id + ) + except ValueError as exc: + self.setStatus(f"No plot: {exc}") + return # get the chartview widget, if exists layout = self.brc_run_viz.plotPage.layout() @@ -112,21 +120,64 @@ def doPlotSlot(self, run, stream_name, action, selections): self.brc_run_viz.setPlot(widget) def doRunSelectedSlot(self, run): - """Slot: run is clicked in the table view.""" + """ + Slot: run is clicked in the table view. + + run *object*: + Instance of ``tapi.RunMetadata`` + """ from functools import partial - from .select_stream_fields import SelectStreamsWidget + from .select_stream_fields import SelectFieldsWidget - self.brc_run_viz.setMetadata(yaml.dump(dict(run.metadata), indent=4)) - self.brc_run_viz.setData(tapi.run_description_table(run)) - self.setStatus(tapi.run_summary(run)) + run_md = run.run_md + self.brc_run_viz.setMetadata(yaml.dump(dict(run_md), indent=4)) + self.brc_run_viz.setData(self.getDataDescription(run)) + self.setStatus(run.summary()) + self.selected_run_uid = run.get_run_md("start", "uid") - widget = SelectStreamsWidget(self, run) + widget = SelectFieldsWidget(self, run) widget.selected.connect(partial(self.doPlotSlot, run)) layout = self.fields_groupbox.layout() utils.removeAllLayoutWidgets(layout) layout.addWidget(widget) + def getDataDescription(self, run): + """Provide text description of the data streams in the run.""" + import pyRestTable + + # Describe what will be plotted. Show in the viz panel "Data" tab. + analysis = run.plottable_signals() + table = pyRestTable.Table() + table.labels = "item description".split() + table.addRow(("scan", run.get_run_md("start", "scan_id"))) + table.addRow(("plan", run.get_run_md("start", "plan_name"))) + if analysis["plot_signal"] is not None: + table.addRow(("stream", analysis["stream"])) + table.addRow(("plot signal", analysis["plot_signal"])) + table.addRow(("plot axes", ", ".join(analysis["plot_axes"]))) + table.addRow(("all detectors", ", ".join(analysis["detectors"]))) + table.addRow(("all positioners", ", ".join(analysis["motors"]))) + text = "plot summary" + text += "\n" + "-" * len(text) + "\n" * 2 + text += f"{table.reST()}\n" + + # Show information about each stream. + rows = [] + for sname in run.stream_metadata(): + title = f"stream: {sname}" + # row = [title, "-" * len(title), str(run.stream_data(sname)), ""] + rows += [title, "-" * len(title), str(run.stream_data(sname)), ""] + + text += "\n".join(rows).strip() + return text + + def refreshFilteredCatalogView(self, *args, **kwargs): + """Update the view with the new filtered catalog.""" + # print(f"{__name__}.{__class__.__name__} {args=} {kwargs=}") + filtered_catalog = self.brc_search_panel.filteredCatalog() + self.brc_tableview.setCatalog(filtered_catalog) + def splitter_moved(self, key, *arg, **kwargs): thread = getattr(self, f"{key}_wait_thread", None) setattr(self, f"{key}_deadline", time.time() + self.motion_wait_time) diff --git a/gemviz/bluesky_runs_catalog_search.py b/gemviz/bluesky_runs_catalog_search.py index 3ad674d..2937a53 100644 --- a/gemviz/bluesky_runs_catalog_search.py +++ b/gemviz/bluesky_runs_catalog_search.py @@ -31,18 +31,21 @@ def catalog(self): return self.parent.catalog() def setupCatalog(self, catalog_name, *args, **kwargs): - from .date_time_range_slider import DAY + from .utils import DAY def getStartTime(uid): - return utils.ts2iso(tapi.get_md(cat[uid], "start", "time")) + md = cat[uid].metadata + ts = (md.get("start") or {}).get("time") + return utils.ts2iso(ts) cat = self.catalog() if len(cat) == 0: self.setStatus(f"Catalog {catalog_name!r} has no runs.") return + keys = cat.keys() start_times = [ - getStartTime(cat.keys().first()), - getStartTime(cat.keys().last()), + getStartTime(keys.first()), + getStartTime(keys.last()), ] t_low = min(start_times) t_high = max(start_times) @@ -72,13 +75,9 @@ def filteredCatalog(self): try: cat = tapi.get_tiled_runs(cat, scan_id=int(scan_id)) except ValueError: - self.setStatus("Invalid entry: scan_id must be an integer.") - pass - # TODO: PR #145 https://github.com/BCDA-APS/gemviz/pull/145 - # after updating tiled is updated (issue #53), we should try this: - # import tiled.catalogs - # empty_catalog = tiled.catalogs.Catalog.from_dict({}) - # return empty_catalog + self.setStatus( + f"Invalid entry: scan_id must be an integer. Received {scan_id=!r}" + ) motors = self.positioners.text().strip() if len(motors) > 0: diff --git a/gemviz/bluesky_runs_catalog_table_model.py b/gemviz/bluesky_runs_catalog_table_model.py index 7fa4534..03bae7e 100644 --- a/gemviz/bluesky_runs_catalog_table_model.py +++ b/gemviz/bluesky_runs_catalog_table_model.py @@ -8,20 +8,13 @@ ~BRCTableModel """ -import datetime import logging -import pyRestTable -import yaml from PyQt5 import QtCore from PyQt5 import QtGui -from . import analyze_run -from . import tapi +from . import utils -logger = logging.getLogger(__name__) -DEFAULT_PAGE_SIZE = 5 -DEFAULT_PAGE_OFFSET = 0 BGCLUT = { # BackGround Color Lookup Table "success": None, # mark background color of unsuccessful runs @@ -30,64 +23,71 @@ # other, such as None (when no stop document) "other": QtGui.QColor(0xE2E2EC), # light blue/grey } +logger = logging.getLogger(__name__) class BRCTableModel(QtCore.QAbstractTableModel): - """Bluesky catalog for QtCore.QAbstractTableModel.""" + """Page of Bluesky catalog runs.""" + + def __init__(self, parent): + self.parent = parent # QTableView + self.runs = {} + + def get_str_list(run, doc, key): + return ", ".join(run.get_run_md(doc, key, [])) - def __init__(self, data): self.actions_library = { - "Scan ID": lambda run: tapi.get_md(run, "start", "scan_id"), - "Plan Name": lambda run: tapi.get_md(run, "start", "plan_name"), - "Positioners": lambda run: self.get_str_list(run, "start", "motors"), - "Detectors": lambda run: self.get_str_list(run, "start", "detectors"), - "#points": lambda run: tapi.get_md(run, "start", "num_points"), - "Date": self.get_run_start_time, - "Status": lambda run: tapi.get_md(run, "stop", "exit_status"), - "Streams": lambda run: self.get_str_list(run, "summary", "stream_names"), - # "uid": lambda run: tapi.get_md(run, "start", "uid"), - # "uid7": self.get_run_uid7, + "Scan ID": lambda run: run.get_run_md("start", "scan_id"), + "Plan Name": lambda run: run.get_run_md("start", "plan_name"), + "Positioners": lambda run: get_str_list(run, "start", "motors"), + "Detectors": lambda run: get_str_list(run, "start", "detectors"), + "#points": lambda run: run.get_run_md("start", "num_points"), + "Date": lambda run: utils.ts2iso(round(run.get_run_md("start", "time"))), + "Status": lambda run: run.get_run_md("stop", "exit_status"), + "Streams": lambda run: get_str_list(run, "summary", "stream_names"), + # "uid": lambda run: run.get_run_md("start", "uid"), + # "uid7": lambda run: run.get_run_md("start", "uid")[:7], } self.columnLabels = list(self.actions_library.keys()) - self.setPageOffset(DEFAULT_PAGE_OFFSET, init=True) - self.setPageSize(DEFAULT_PAGE_SIZE, init=True) - self.setAscending(True) - self._catalog_count = 0 - - self.setCatalog(data) - self.setUidList() - - super().__init__() + super().__init__(parent) + # print(f"{__name__}: {data=}") # ------------ methods required by Qt's view def rowCount(self, parent=None): + """Return the number of rows. Called by QTableView.""" # Want it to return the number of rows to be shown at a given time - value = len(self.uidList()) + value = len(self.runs) return value def columnCount(self, parent=None): + """Return the number of columns. Called by QTableView.""" # Want it to return the number of columns to be shown at a given time value = len(self.columnLabels) return value def data(self, index, role=None): + """Return the cell data. Called by QTableView.""" if role == QtCore.Qt.DisplayRole: # display data - logger.debug("Display role: %d, %d", index.row(), index.column()) - run = self.indexToRun(index) - label = self.columnLabels[index.column()] + row, column = index.row(), index.column() + label = self.columnLabels[column] action = self.actions_library[label] - return action(run) + run = list(self.runs.values())[row] + result = action(run) + logger.debug("Display role: (%d, %d) %s", row, column, result) + # print(f"{__name__}: ({row}, {column}) {result}") + return result elif role == QtCore.Qt.BackgroundRole: - run = self.indexToRun(index) - exit_status = tapi.get_md(run, "stop", "exit_status", "unknown") + run = list(self.runs.values())[index.row()] + exit_status = run.get_run_md("stop", "exit_status", "unknown") bgcolor = BGCLUT.get(exit_status, BGCLUT["other"]) if bgcolor is not None: return QtGui.QBrush(bgcolor) def headerData(self, section, orientation, role=QtCore.Qt.DisplayRole): + """Return the column label. Called by QTableView.""" if role == QtCore.Qt.DisplayRole: if orientation == QtCore.Qt.Horizontal: return self.columnLabels[section] @@ -96,183 +96,16 @@ def headerData(self, section, orientation, role=QtCore.Qt.DisplayRole): # ------------ methods required by the view - def doPager(self, action, value=None): - logger.debug("action=%s, value=%s", action, value) - - catalog_count = self.catalogCount() - offset = self.pageOffset() - size = self.pageSize() - logger.debug( - "catalog_count=%s, offset=%s, size=%s", catalog_count, offset, size - ) - - if action == "first": - self.setPageOffset(0) - elif action == "pageSize": - self.setPageSize(value) - elif action == "back": - value = offset - size - value = min(value, catalog_count) - value = max(value, 0) - self.setPageOffset(value) - elif action == "next": - value = offset + size - value = min(value, catalog_count - size) - value = max(value, 0) - self.setPageOffset(value) - elif action == "last": - value = catalog_count - size - value = max(value, 0) - self.setPageOffset(value) - - try: - self.setUidList() - except tapi.TiledServerError as exc: - # reset to previous values - self.setPageOffset(offset) - self.setPageSize(size) - - # re-raise for reporting in the view - raise exc - logger.debug("pageOffset=%s, pageSize=%s", self.pageOffset(), self.pageSize()) - - def isPagerAtStart(self): - return self.pageOffset() == 0 - - def isPagerAtEnd(self): - # number is zero-based - last_row_number = self.pageOffset() + len(self.uidList()) - return last_row_number >= self.catalogCount() - - # ------------ local methods - - def get_run_start_time(self, run): - """Return the run's start time as ISO8601 string.""" - ts = tapi.get_md(run, "start", "time", 0) - dt = datetime.datetime.fromtimestamp(round(ts)) - return dt.isoformat(sep=" ") - - def get_run_uid7(self, run): - """Return the run's uid, truncated to the first 7 characters.""" - uid = tapi.get_md(run, "start", "uid") - return uid[:7] - - def get_str_list(self, run, doc, key): - """Return the document's key values as a list.""" - items = tapi.get_md(run, doc, key, []) - return ", ".join(items) - - # ------------ get & set methods - - def catalog(self): - return self._catalog - - def catalogCount(self): - return self._catalog_count - - def setCatalog(self, catalog): - self._catalog = catalog - self._catalog_count = len(catalog) - - def uidList(self): - return self._uidList - - def setUidList(self): - self._uidList = tapi.get_tiled_slice( - self.catalog(), - self.pageOffset(), - self.pageSize(), - self.ascending(), - ) - - def pageOffset(self): - return self._pageOffset - - def pageSize(self): - return self._pageSize - - def setPageOffset(self, offset, init=False): - """Set the pager offset.""" - offset = int(offset) - if init: - self._pageOffset = offset - elif offset != self._pageOffset: - self._pageOffset = offset - self.layoutChanged.emit() - - def setPageSize(self, value, init=False): - """Set the pager size.""" - value = int(value) - if init: - self._pageSize = value - elif value != self._pageSize: - self._pageSize = value - self.layoutChanged.emit() - - def ascending(self): - return self._ascending - - def setAscending(self, value): - self._ascending = value - - def pagerStatus(self): - total = self.catalogCount() - if total == 0: - text = "No runs" - else: - start = self.pageOffset() - end = start + len(self.uidList()) - text = f"{start + 1}-{end} of {total} runs" - return text - - def indexToRun(self, index): - uid = self.uidList()[index.row()] - return self.catalog()[uid] - def getMetadata(self, index): - """Provide a text view of the run metadata.""" - run = self.indexToRun(index) - md = yaml.dump(dict(run.metadata), indent=4) - return md - - def getDataDescription(self, index): - """Provide text description of the data streams.""" - run = self.indexToRun(index) - - # Describe what will be plotted. - analysis = analyze_run.SignalAxesFields(run).to_dict() - table = pyRestTable.Table() - table.labels = "item description".split() - table.addRow(("scan", analysis["scan_id"])) - table.addRow(("plan", analysis["plan"])) - table.addRow(("chart", analysis["chart_type"])) - if analysis["plot_signal"] is not None: - table.addRow(("stream", analysis["stream"])) - table.addRow(("plot signal", analysis["plot_signal"])) - table.addRow(("plot axes", ", ".join(analysis["plot_axes"]))) - table.addRow(("all detectors", ", ".join(analysis["detectors"]))) - table.addRow(("all positioners", ", ".join(analysis["positioners"]))) - text = "plot summary" - text += "\n" + "-" * len(text) + "\n" * 2 - text += f"{table.reST()}\n" - - # information about each stream - rows = [] - for sname in run: - title = f"stream: {sname}" - rows.append(title) - rows.append("-" * len(title)) - stream = run[sname] - data = stream["data"].read() - rows.append(str(data)) - rows.append("") - - text += "\n".join(rows).strip() - return text - - def getSummary(self, index): - run = self.indexToRun(index) - return ( - f'#{tapi.get_md(run, "start", "scan_id", "unknown")}' - f' {tapi.get_md(run, "start", "plan_name", "unknown")}' - ) + """Return the selected run's metadata.""" + return list(self.runs.values())[index] + + def setRuns(self, runs): + """ + Define the run (metadata) to be shown in the table now. + + runs *dict(uid, metadata_dictionary)*: + Dictionary of run metadata, keyed by run uid. + """ + self.runs = runs + self.layoutChanged.emit() # Tell the view there is new data. diff --git a/gemviz/bluesky_runs_catalog_table_view.py b/gemviz/bluesky_runs_catalog_table_view.py index f7eae8d..a0529b4 100644 --- a/gemviz/bluesky_runs_catalog_table_view.py +++ b/gemviz/bluesky_runs_catalog_table_view.py @@ -16,7 +16,6 @@ from PyQt5 import QtCore from PyQt5 import QtWidgets -from . import tapi from . import utils logger = logging.getLogger(__name__) @@ -34,78 +33,45 @@ class BRCTableView(QtWidgets.QWidget): ui_file = utils.getUiFileName(__file__) run_selected = QtCore.pyqtSignal(object) - def __init__(self, parent): + def __init__(self, parent, catalog, page_offset, page_size): self.parent = parent - super().__init__() + self._catalog = catalog + self._catalog_length = len(catalog) + self.run_cache = {} + + super().__init__(parent) utils.myLoadUi(self.ui_file, baseinstance=self) - self.setup() + self.setup(page_offset, page_size) + + def setup(self, page_offset, page_size): + """Setup the catalog view panel.""" + from .bluesky_runs_catalog_table_model import BRCTableModel + + self.model = BRCTableModel(self) + self.tableView.setModel(self.model) - def setup(self): # since we cannot set header's ResizeMode in Designer ... header = self.tableView.horizontalHeader() header.setSectionResizeMode(QtWidgets.QHeaderView.ResizeToContents) + if self.pageSize.findText(str(page_size)) == -1: + self.pageSize.insertItem(0, str(page_size)) + self.pageSize.setCurrentText(str(page_size)) + self.setPage(page_offset, page_size) + + self.pageSize.currentTextChanged.connect( + partial(self.doPagerButtons, "pageSize") + ) for button_name in "first back next last".split(): button = getattr(self, button_name) # custom: pass the button name to the receiver button.released.connect(partial(self.doPagerButtons, button_name)) - self.pageSize.currentTextChanged.connect(self.doPageSize) - self.doButtonPermissions() - self.setPagerStatus() - self.tableView.clicked.connect(self.doRunSelectedSlot) - - def doPagerButtons(self, action, **kwargs): - # self.setStatus(f"{action=} {kwargs=}") - model = self.tableView.model() - - if model is not None: - try: - model.doPager(action) - self.setStatus(f"{model.pageOffset()=}") - self.doButtonPermissions() - self.setPagerStatus() - except tapi.TiledServerError as exc: - self.setStatus(str(exc)) - dialog = QtWidgets.QMessageBox(self) - dialog.setWindowTitle("Notice") - dialog.setIcon(dialog.Warning) - dialog.setText(f"Error when paging.\n{exc}") - dialog.exec() - - def doPageSize(self, value): - # self.setStatus(f"doPageSize {value =}") - model = self.tableView.model() - - if model is not None: - model.doPager("pageSize", value) - self.doButtonPermissions() - self.setPagerStatus() - - def doButtonPermissions(self): - model = self.tableView.model() - atStart = False if model is None else model.isPagerAtStart() - atEnd = False if model is None else model.isPagerAtEnd() - - self.first.setEnabled(not atStart) - self.back.setEnabled(not atStart) - self.next.setEnabled(not atEnd) - self.last.setEnabled(not atEnd) - - def displayTable(self): - from .bluesky_runs_catalog_table_model import BRCTableModel - - self.cat = self.parent.brc_search_panel.filteredCatalog() - data_model = BRCTableModel(self.cat) - # self.setStatus(f"Displaying catalog: {self.cat.item['id']!r}") - page_size = self.pageSize.currentText() # remember the current value - self.tableView.setModel(data_model) - self.doPageSize(page_size) # restore - self.setPagerStatus() self.parent.brc_search_panel.enableDateRange( len(self.parent.brc_search_panel.catalog()) > 0 ) - labels = data_model.columnLabels + + labels = self.model.columnLabels def centerColumn(label): if label in labels: @@ -116,19 +82,135 @@ def centerColumn(label): centerColumn("Scan ID") centerColumn("#points") + self.setButtonPermissions() + self.setPagerStatus() + self.tableView.clicked.connect(self.doRunSelectedSlot) + + def doPagerButtons(self, action, **kwargs): + """User clicked a button to change the page.""" + logger.debug("action=%s", action) + + if action == "first": + self.setPage(0, self.page_size) + elif action == "back": + self.setPage(self.page_offset - self.page_size, self.page_size) + elif action == "pageSize": + self.setPage(self.page_offset, self.pageSize.currentText()) + elif action == "next": + self.setPage(self.page_offset + self.page_size, self.page_size) + elif action == "last": + self.setPage(-1, self.page_size) + + self.setButtonPermissions() + self.setPagerStatus() + + @property + def pagerAtStart(self): + """Is this the first page?""" + return self.page_offset == 0 + + @property + def pagerAtEnd(self): + """Is this the last page?""" + # number is zero-based + return (self.page_offset + self.page_size) >= self.catalogLength() + + def setButtonPermissions(self): + """Enable/disable the pager buttons, depending on page in view.""" + first_page = self.pagerAtStart + last_page = self.pagerAtEnd + + self.first.setEnabled(not first_page) + self.back.setEnabled(not first_page) + self.next.setEnabled(not last_page) + self.last.setEnabled(not last_page) + + def setPage(self, offset, size): + """Choose the page. Update the model.""" + # user cannot edit directly, not expected to raise an exception + offset = int(offset) + size = int(size) + + self.page_size = max(0, min(size, self.catalogLength())) + if offset >= 0: + offset = min(offset, self.catalogLength() - self.page_size) + else: + offset = self.catalogLength() - self.page_size + self.page_offset = max(0, offset) + if int(self.pageSize.currentText()) != self.page_size: + self.pageSize.setCurrentText(str(self.page_size)) + logger.debug( + "len(catalog)=%d offset=%d size=%d", + self.catalogLength(), + self.page_offset, + self.page_size, + ) + + # TODO: unselect row if selection is not on the page + # see: https://stackoverflow.com/questions/64225673 + # "how-to-deselect-an-entire-qtablewidget-row" + + self.updateModelData() + + def updateModelData(self): + """Send a new page of runs to the model.""" + from . import tapi + + # get list of metadata for each run to be shown in the table + start = self.page_offset + end = self.page_offset + self.page_size + uid_list = self.catalog().keys()[start:end] + + page = {} # the new page of run metadata + for uid in uid_list: + run_md = self.run_cache.get(uid) + if run_md is None or run_md.active: + # Get new information from the server about this run. + run_md = tapi.RunMetadata(self.catalog(), uid) + self.run_cache[uid] = run_md # update the cache + page[uid] = run_md + + # Send the page of runs to the model now. + self.model.setRuns(page) + def setPagerStatus(self, text=None): if text is None: - model = self.tableView.model() - if model is not None: - text = model.pagerStatus() + total = self.catalogLength() # filtered catalog + if total == 0: + text = "No runs" + else: + start = self.page_offset + end = start + self.page_size + text = f"{start + 1}-{end} of {total} runs" self.status.setText(text) self.setStatus(text) def doRunSelectedSlot(self, index): - model = self.tableView.model() - if model is not None: - self.run_selected.emit(model.indexToRun(index)) + run_md = list(self.model.runs.values())[index.row()] + self.run_selected.emit(run_md) + + def setCatalog(self, catalog): + self._catalog = catalog # filtered catalog + self._catalog_length = len(catalog) + + uid = self.parent.selected_run_uid + if uid in self.model.runs: + offset = list(self.model.runs.keys()).index(uid) + else: + offset = -1 + self.setPage(offset, self.page_size) # ... and update the model + self.setPagerStatus() + + def catalog(self): + return self._catalog + + def catalogLength(self): + # Avoid a bug in the tiled client. When the client is asked for + # len(catalog) frequently, it will begin to return a length of ``1`` + # instead of the actual length. After waiting a short while, the client + # will return the actual length again. + return self._catalog_length def setStatus(self, text): self.parent.setStatus(text) diff --git a/gemviz/date_time_range_slider.py b/gemviz/date_time_range_slider.py index 3c2e98d..090efca 100644 --- a/gemviz/date_time_range_slider.py +++ b/gemviz/date_time_range_slider.py @@ -10,12 +10,6 @@ from . import utils -SECOND = 1 -MINUTE = 60 * SECOND -HOUR = 60 * MINUTE -DAY = 24 * HOUR -WEEK = 7 * DAY - DEFAULT_MINIMUM = "1995-01-01" DEFAULT_LOW = "2023-01-01" DEFAULT_HIGH = "2024-05-01" @@ -82,11 +76,11 @@ def setup(self): def _timestamp_units(self, slider): """Convert slider units (days) to timestamp units (seconds).""" - return slider * DAY + return slider * utils.DAY def _slider_units(self, timestamp): """Convert timestamp units (seconds) to slider units (days).""" - return int(timestamp / DAY) + return int(timestamp / utils.DAY) def adjustDates(self, low, high): """ diff --git a/gemviz/mainwindow.py b/gemviz/mainwindow.py index 0906ca6..15276d1 100644 --- a/gemviz/mainwindow.py +++ b/gemviz/mainwindow.py @@ -1,3 +1,9 @@ +""" +gemviz main window +""" + +# TODO: remove testing URLs before production + import logging from PyQt5 import QtCore @@ -11,10 +17,12 @@ from .tiledserverdialog import TILED_SERVER_SETTINGS_KEY from .user_settings import settings -# TODO: remove testing URLs before production - +TESTING_URLS = [TESTING_URL, LOCALHOST_URL] MAX_RECENT_URI = 5 UI_FILE = utils.getUiFileName(__file__) +SORT_ASCENDING = 1 +SORT_DESCENDING = -SORT_ASCENDING +SORT_DIRECTION = SORT_ASCENDING logger = logging.getLogger(__name__) @@ -109,7 +117,7 @@ def catalogType(self): def catalogName(self): return self._catalogName - def setCatalog(self, catalog_name): + def setCatalog(self, catalog_name, sort_direction=SORT_DIRECTION): """A catalog was selected (from the pop-up menu).""" self.setStatus(f"Selected catalog {catalog_name!r}.") if len(catalog_name) == 0 or catalog_name not in self.server(): @@ -117,7 +125,7 @@ def setCatalog(self, catalog_name): self.setStatus(f"Catalog {catalog_name!r} is not supported now.") return self._catalogName = catalog_name - self._catalog = self.server()[catalog_name] + self._catalog = self.server()[catalog_name].sort(("time", sort_direction)) spec_name = self.catalogType() self.spec_name.setText(spec_name) @@ -132,13 +140,27 @@ def setCatalog(self, catalog_name): self.mvc_catalog = BRC_MVC(self) layout.addWidget(self.mvc_catalog) else: + # Not expected to run this branch since cannot select + # catalog we cannot handle. self.mvc_catalog = None layout.addWidget(QtWidgets.QWidget()) # nothing to show def setCatalogs(self, catalogs): - """Set the names (of server's catalogs) in the pop-up list.""" + """ + Set the names (of server's catalogs) in the pop-up list. + + Only add catalogs of CatalogOfBlueskyRuns. + """ self.catalogs.clear() - self.catalogs.addItems(catalogs) + for catalog_name in catalogs: + try: + spec = self.server()[catalog_name].specs[0] + if spec.name == "CatalogOfBlueskyRuns" and spec.version == "1": + self.catalogs.addItem(catalog_name) + except Exception as exc: + message = f"Problem with catalog {catalog_name}: {exc}" + logger.debug(message) + self.setStatus(message) def clearContent(self, clear_cat=True): layout = self.groupbox.layout() @@ -161,9 +183,11 @@ def setServerList(self, selected_uri=None): ] settings.setKey(TILED_SERVER_SETTINGS_KEY, ",".join(final_uri_list)) else: - # if no server selected in open dialog, keep the first pull down menu value to "" + # if no server selected in open dialog, + # keep the first pull down menu value to "" final_uri_list = [""] + recent_uris_list[:MAX_RECENT_URI] - final_uri_list = [*final_uri_list, TESTING_URL, LOCALHOST_URL, "Other..."] + final_uri_list += TESTING_URLS + final_uri_list.append("Other...") self._serverList = final_uri_list def setServers(self, selected_uri): diff --git a/gemviz/resources/bluesky_runs_catalog.ui b/gemviz/resources/bluesky_runs_catalog.ui index ce47dd4..3a5135e 100644 --- a/gemviz/resources/bluesky_runs_catalog.ui +++ b/gemviz/resources/bluesky_runs_catalog.ui @@ -44,7 +44,7 @@ - Stream Fields + Select Fields to Plot diff --git a/gemviz/select_stream_fields.py b/gemviz/select_stream_fields.py index b7cdf26..66fc955 100644 --- a/gemviz/select_stream_fields.py +++ b/gemviz/select_stream_fields.py @@ -3,20 +3,16 @@ .. autosummary:: - ~SelectStreamsWidget + ~SelectFieldsWidget ~to_datasets """ -import datetime import logging -from dataclasses import dataclass from PyQt5 import QtCore from PyQt5 import QtWidgets -from . import tapi from . import utils -from .analyze_run import SignalAxesFields from .select_fields_tablemodel import ColumnDataType from .select_fields_tablemodel import FieldRuleType from .select_fields_tablemodel import TableColumn @@ -34,28 +30,33 @@ ] -class SelectStreamsWidget(QtWidgets.QWidget): +class SelectFieldsWidget(QtWidgets.QWidget): + """Panel to select fields (signals) for plotting.""" + ui_file = utils.getUiFileName(__file__) selected = QtCore.pyqtSignal(str, str, dict) - def __init__(self, parent, run, default_stream=DEFAULT_STREAM): + def __init__(self, parent, run): self.parent = parent - self.run = run - self.analysis = SignalAxesFields(run) - self.stream_name = default_stream + self.run = run # tapi.RunMetadata object + self.analysis = run.plottable_signals() + self.stream_name = self.analysis.get("stream", DEFAULT_STREAM) super().__init__() utils.myLoadUi(self.ui_file, baseinstance=self) self.setup() def setup(self): - self.run_summary.setText(tapi.run_summary(self.run)) - - stream_list = list(self.run) - if self.stream_name in stream_list: + self.run_summary.setText(self.run.summary()) + + stream_list = list(self.run.stream_metadata()) + if "baseline" in stream_list: + # Too many signals! 2 points each. Do not plot from "baseline" stream. + stream_list.pop(stream_list.index("baseline")) + index = stream_list.index(self.stream_name) + if index > 0: # Move the default stream to the first position. - stream_list.remove(self.stream_name) - stream_list.insert(0, self.stream_name) + stream_list.insert(0, stream_list.pop(index)) if len(stream_list) > 0: self.setStream(stream_list[0]) @@ -68,27 +69,31 @@ def setStream(self, stream_name): from functools import partial self.stream_name = stream_name - stream = self.run[stream_name] + stream = self.run.run[stream_name] logger.debug("stream_name=%s, stream=%s", stream_name, stream) - # TODO: This is for 1-D. Generalize for multi-dimensional. - # hint: Checkbox column in the columns table might provide. - x_name = None - y_name = None - if stream_name == self.analysis.stream_name: - if len(self.analysis.plot_axes) > 0: - x_name = self.analysis.plot_axes[0] - y_name = self.analysis.plot_signal + x_names = self.analysis["plot_axes"] + y_name = self.analysis["plot_signal"] # describe the data fields for the dialog. + sdf = self.run.stream_data_fields(stream_name) + # print(f"{__name__}.{__class__.__name__}: {sdf=}") fields = [] - for field_name in tapi.stream_data_fields(stream): + for field_name in sdf: selection = None - if x_name is not None and field_name == x_name: + if x_names is not None and field_name in x_names: selection = "X" elif y_name is not None and field_name == y_name: selection = "Y" - shape = tapi.stream_data_field_shape(stream, field_name) + shape = self.run.stream_data_field_shape(stream_name, field_name) + if len(shape) == 0: + # print(f"{stream_name=} {field_name=} {shape=}") + logger.debug( + "stream_name=%s field_name=%s shape=%s", + stream_name, + field_name, + shape, + ) field = TableField(field_name, selection=selection, shape=shape) fields.append(field) logger.debug("fields=%s", fields) @@ -108,10 +113,12 @@ def relayPlotSelections(self, stream_name, action, selections): self.selected.emit(stream_name, action, selections) -def to_datasets(stream, selections, scan_id=None): +def to_datasets(run, stream_name, selections, scan_id=None): """Prepare datasets and options for plotting.""" from . import chartview + stream = run.stream_data(stream_name) + x_axis = selections.get("X") x_datetime = False # special scaling using datetime if x_axis is None: @@ -119,9 +126,9 @@ def to_datasets(stream, selections, scan_id=None): x_units = "" x_axis = "data point number" else: - x_data = stream["data"][x_axis].compute() + x_data = stream[x_axis].compute() x_shape = x_data.shape - x_units = tapi.stream_data_field_units(stream, x_axis) + x_units = run.stream_data_field_units(stream_name, x_axis) if len(x_shape) != 1: # fmt: off raise ValueError( @@ -134,13 +141,16 @@ def to_datasets(stream, selections, scan_id=None): x_datetime = True datasets = [] - for y_axis in selections.get("Y", []): + y_selections = selections.get("Y", []) + if len(y_selections) == 0: + raise ValueError("No Y data selected.") + for y_axis in y_selections: ds, ds_options = [], {} color = chartview.auto_color() symbol = chartview.auto_symbol() - y_data = stream["data"][y_axis].compute() - y_units = tapi.stream_data_field_units(stream, y_axis) + y_data = stream[y_axis].compute() + y_units = run.stream_data_field_units(stream_name, y_axis) y_shape = y_data.shape if len(y_shape) != 1: # fmt: off @@ -148,11 +158,9 @@ def to_datasets(stream, selections, scan_id=None): "Can only plot 1-D data now." f" {y_axis} shape is {y_shape}" ) - suffix = stream.metadata["stream_name"] - run_uid = stream.metadata["descriptors"][0].get("run_start", "") - if scan_id is not None: - suffix = f"#{scan_id} {suffix} {run_uid[:7]}" - ds_options["name"] = f"{y_axis} ({suffix})" + + run_uid = run.get_run_md("start", "uid") + ds_options["name"] = f"{y_axis} ({run.summary()} {run_uid[:7]})" ds_options["pen"] = color # line color ds_options["symbol"] = symbol ds_options["symbolBrush"] = color # fill color diff --git a/gemviz/tapi.py b/gemviz/tapi.py index 439ed96..eb88582 100644 --- a/gemviz/tapi.py +++ b/gemviz/tapi.py @@ -4,34 +4,260 @@ .. autosummary: ~connect_tiled_server - ~get_md ~get_tiled_runs ~QueryTimeSince ~QueryTimeUntil - ~run_description_table - ~run_summary - ~run_summary_table - ~stream_data_field_pv - ~stream_data_field_shape - ~stream_data_field_units - ~stream_data_fields + ~RunMetadata ~TiledServerError """ -import datetime +import logging import tiled import tiled.queries from httpx import HTTPStatusError -from . import utils +logger = logging.getLogger(__name__) class TiledServerError(RuntimeError): """An error from the tiled server.""" +class RunMetadata: + """Cache the metadata for a single run.""" + + def __init__(self, cat, uid): + self.catalog = cat + self.uid = uid + self.request_from_tiled_server() + + def __str__(self) -> str: + return ( + f"{__class__.__name__}(catalog={self.catalog.item['id']!r}," + f" uid7={self.uid[:7]!r}," + f" active={self.active})" + ) + + def request_from_tiled_server(self): + """Get run details from server.""" + self.run = self.catalog[self.uid] + self.run_md = self.run.metadata + self.active = ( + self.uid == self.catalog.keys().last() and "stop" not in self.run_md + ) + self.streams_md = None + self.streams_data = None + + def get_run_md(self, doc, key, default=None): + """Get metadata by key from run document.""" + return (self.run_md.get(doc) or {}).get(key, default) + + def plottable_signals(self): + """ + Return a dict with the plottable data for this run. + + * field: any available numeric data keys + * motors: any data keys for motors declared by the run + * detectors: any numeric data keys that are not motors or excluded names + * plot_signal: the first detector signal + * plot_axes: the first motor signal for each dimension + + * run.metadata[hints][dimensions] show the independent axes object names + * Any given dimension may have more than one motor object (a2scan, ...) + * This code chooses to only use the first motor of each dimension. + * The stream descriptor list is usually length = 1. + * object_keys are used to get lists of data_keys (fields) + """ + + def find_name_device_or_signal(key): + if key in stream_hints: # from ophyd.Device + return stream_hints[key]["fields"] + elif key in descriptor["data_keys"]: # from ophyd.Signal + return [key] + raise KeyError(f"Could not find {key=}") + + def get_signal(key): + try: + return find_name_device_or_signal(key)[0] # just the first one + except KeyError: + if key != "time": + raise KeyError(f"Unexpected key: {key!r}") + return key # "time" is a special case + + def is_numeric(signal): + dtype = descriptor["data_keys"][signal]["dtype"] + if dtype == "array": + stream_data = self.stream_data(self.stream_name) + ntype = stream_data["data"][signal].dtype.name + if ntype.startswith("int") or ntype.startswith("float"): + dtype = "number" + return dtype == "number" + + # dimensions of the run + run_dims = self.get_run_md("start", "hints", {}).get("dimensions", []) + + # data stream to be used + streams = [d[1] for d in run_dims] + if len(set(streams)) != 1: + # All hinted dimensions should come from the same stream. + raise ValueError(f"Not handling hinted dimensions: {run_dims=}") + stream = streams[0] + + # description of the data stream objects + descriptors = self.stream_metadata(stream).get("descriptors", {}) + if len(descriptors) != 1: + raise ValueError(f"Not handling situation of {len(descriptors)=}") + + descriptor = descriptors[0] + + # Mapping from object_keys to data_keys. + stream_hints = descriptor.get("hints", {}) + + # First motor signal for each dimension. + try: + axes = [get_signal(d[0][0]) for d in run_dims] + except KeyError as exc: + raise exc + + # All motor signals. + motors = [ + signal + for motor in self.get_run_md("start", "motors", []) + for signal in find_name_device_or_signal(motor) + ] + + # All detector signals. + detectors = [ + signal + for detector in self.get_run_md("start", "detectors") + for signal in find_name_device_or_signal(detector) + if is_numeric(signal) + ] + + fields = [] + for descriptor in descriptors: + hints = descriptor.get("hints", {}) + for obj_name in descriptor["object_keys"]: + try: + signals = hints.get(obj_name, {})["fields"] + except KeyError: + # ``ophyd.Device`` can have multiple signals + signals = descriptor["object_keys"].get(obj_name, []) + fields.extend([k for k in signals if is_numeric(k)]) + + status = self.get_run_md("stop", "exit_status") + plot_signal = None + if status in "abort success".split(): + # These runs probably have plottable data fields. + + # Do not choose any of these fields as the default + # (NeXus-style plottable) signal data. + not_plottable_signals = """ + timebase + preset_time + """.split() + + names_to_avoid = motors + not_plottable_signals + possible_signals = detectors + fields + for field in possible_signals: + if field not in names_to_avoid: + plot_signal = field + break + + return { + "catalog": self.catalog.item["id"], + "uid": self.uid, + "stream": stream, + "plot_signal": plot_signal, + "plot_axes": axes, + "motors": motors, + "detectors": detectors, + "fields": fields, + } + + def stream_data(self, stream_name): + """Return the data structure for this stream.""" + if self.streams_data is None: + # Optimize with a cache. + self.streams_data = { + sname: self.run[sname]["data"].read() for sname in self.run + } + + return self.streams_data[stream_name] + + def stream_data_field_shape(self, stream_name, field_name): + """Shape of this data field.""" + stream = self.stream_data(stream_name) + try: + shape = stream[field_name].shape + except Exception: + shape = () + return shape + + def stream_data_fields(self, stream_name): + """ + Data field (names) of this BlueskyEventStream. + + Sort the list by relevance. + + First "time" (epoch timestamp for each event document), then "config" (the + caller provided these names as parameters for this stream), then "data" + (other signals in this stream, usually added from a Device hint). + """ + fields = sorted(self.stream_data(stream_name)) + + # Promote "time" field to first place. + if "time" in fields: + fields.remove("time") + fields.insert(0, "time") + return fields + + def stream_data_field_pv(self, stream_name, field_name): + """EPICS PV name of this field.""" + pv = "" + try: + descriptors = self.stream_metadata(stream_name).get("descriptors", {}) + assert len(descriptors) == 1, f"{stream_name=} has {len(descriptors)=}" + source = descriptors[0]["data_keys"][field_name].get("source", "") + if source.startswith("PV:"): + pv = source[3:] + except Exception: + pass + return pv + + def stream_data_field_units(self, stream_name, field_name): + """Engineering units of this field.""" + units = "" + try: + descriptors = self.stream_metadata(stream_name).get("descriptors", {}) + assert len(descriptors) == 1, f"{stream_name=} has {len(descriptors)=}" + units = descriptors[0]["data_keys"][field_name].get("units", "") + except Exception: + pass + return units + + def stream_metadata(self, stream_name=None): + """Return the metadata dictionary for this stream.""" + if self.streams_md is None: + # Optimize with a cache. + self.streams_md = {sname: self.run[sname].metadata for sname in self.run} + + if stream_name is None: + return self.streams_md + return self.streams_md[stream_name] + + def summary(self): + """Summary (text) of this run.""" + return ( + f"{self.get_run_md('start', 'plan_name', '')}" + f" {self.get_run_md('start', 'scan_id', '?')}" + f" {self.get_run_md('start', 'title', '')}" + ).strip() + + def connect_tiled_server(uri): + """Make connection with the tiled server URI. Return a client object.""" from tiled.client import from_uri # leave out "dask" and get numpy by default @@ -41,11 +267,6 @@ def connect_tiled_server(uri): return client -def get_md(parent, doc, key, default=None): - """Cautiously, get metadata from tiled object by document and key.""" - return (parent.metadata.get(doc) or {}).get(key) or default - - def get_tiled_slice(cat, offset, size, ascending=True): end = offset + size key_gen = cat.keys() @@ -65,18 +286,22 @@ def get_tiled_slice(cat, offset, size, ascending=True): def QueryTimeSince(isotime): """Tiled client query: all runs since given date/time.""" + from . import utils + return tiled.queries.Key("time") >= utils.iso2ts(isotime) def QueryTimeUntil(isotime): """Tiled client query: all runs until given date/time.""" + from . import utils + return tiled.queries.Key("time") <= utils.iso2ts(isotime) def get_run(uri=None, catalog="training", reference=None): """Get referenced run from tiled server catalog.""" - from gemviz.tapi import connect_tiled_server - from gemviz.tapi import get_tiled_runs + # from gemviz.tapi import connect_tiled_server + # from gemviz.tapi import get_tiled_runs uri = uri or "http://localhost:8020" client = connect_tiled_server(uri) @@ -124,137 +349,3 @@ def get_tiled_runs(cat, since=None, until=None, text=[], text_case=[], **keys): for v in text_case: cat = cat.search(tiled.queries.FullText(v, case_sensitive=True)) return cat - - -def run_description_table(run): - """Provide text description of the data streams.""" - import pyRestTable - - from . import analyze_run - - # Describe what will be plotted. - analysis = analyze_run.SignalAxesFields(run).to_dict() - table = pyRestTable.Table() - table.labels = "item description".split() - table.addRow(("scan", analysis["scan_id"])) - table.addRow(("plan", analysis["plan"])) - table.addRow(("chart", analysis["chart_type"])) - if analysis["plot_signal"] is not None: - table.addRow(("stream", analysis["stream"])) - table.addRow(("plot signal", analysis["plot_signal"])) - table.addRow(("plot axes", ", ".join(analysis["plot_axes"]))) - table.addRow(("all detectors", ", ".join(analysis["detectors"]))) - table.addRow(("all positioners", ", ".join(analysis["positioners"]))) - text = "plot summary" - text += "\n" + "-" * len(text) + "\n" * 2 - text += f"{table.reST()}\n" - - # information about each stream - rows = [] - for sname in run: - title = f"stream: {sname}" - rows.append(title) - rows.append("-" * len(title)) - stream = run[sname] - data = stream["data"].read() - rows.append(str(data)) - rows.append("") - - text += "\n".join(rows).strip() - return text - - -def run_summary(run): - """Summary (text) of this run.""" - md = run.metadata - return ( - f"{md.get('start', {}).get('plan_name', '')}" - f" #{md.get('start', {}).get('scan_id', '?')}" - # f" {utils.ts2iso(md.get('start', {}).get('time', 0))}" - # f" ({md.get('start', {}).get('uid', '')[:7]})" - f" {md.get('start', {}).get('title', '')}" - ).strip() - - -def run_summary_table(runs): - import pyRestTable - - table = pyRestTable.Table() - table.labels = "# uid7 scan# plan #points exit started streams".split() - for i, uid in enumerate(runs, start=1): - run = runs[uid] - md = run.metadata - t0 = md["start"].get("time") - table.addRow( - ( - i, - uid[:7], - md["summary"].get("scan_id"), - md["summary"].get("plan_name"), - md["start"].get("num_points"), - (md["stop"] or {}).get("exit_status"), # if no stop document! - datetime.datetime.fromtimestamp(t0).isoformat(sep=" "), - ", ".join(md["summary"].get("stream_names")), - ) - ) - return table - - -def stream_data_fields(stream): - """ - Data field (names) of this BlueskyEventStream. - - Sort the list by relevance. - - First "time" (epoch timestamp for each event document), then "config" (the - caller provided these names as parameters for this stream), then "data" - (other signals in this stream, usually added from a Device hint). - """ - # List any stream["config"] names first. - fields = sorted(stream.get("config", [])) - - # Other names from "data" are sorted alphabetically. - for nm in sorted(stream.get("data", [])): - if nm not in fields: - fields.append(nm) - - # Promote "time" field to first place. - if "time" in fields: - fields.remove("time") - fields.insert(0, "time") - return fields - - -def stream_data_field_shape(stream, field_name): - """Shape of this data field.""" - try: - shape = stream["data"][field_name].shape - except Exception: - shape = () - return shape - - -def stream_data_field_pv(stream, field_name): - """EPICS PV name of this field.""" - pv = "" - try: - descriptors = list(stream.metadata["descriptors"]) - assert len(descriptors) == 1, f"{stream=} has {len(descriptors)=}" - source = descriptors[0]["data_keys"][field_name].get("source", "") - if source.startswith("PV:"): - pv = source[3:] - except Exception: - pass - return pv - - -def stream_data_field_units(stream, field_name): - """Engineering units of this field.""" - units = "" - try: - descriptors = list(stream.metadata["descriptors"]) - assert len(descriptors) == 1, f"{stream=} has {len(descriptors)=}" - units = descriptors[0]["data_keys"][field_name].get("units", "") - except Exception: - pass - return units diff --git a/gemviz/utils.py b/gemviz/utils.py index 61de62b..cd800bf 100644 --- a/gemviz/utils.py +++ b/gemviz/utils.py @@ -21,6 +21,13 @@ logger = logging.getLogger(__name__) +SECOND = 1 +MINUTE = 60 * SECOND +HOUR = 60 * MINUTE +DAY = 24 * HOUR +WEEK = 7 * DAY + + def iso2dt(iso_date_time): """Convert ISO8601 time string to datetime object.""" return datetime.datetime.fromisoformat(iso_date_time)