Source code for uwsift.control.time_transformer_policies

import logging
from datetime import datetime
from typing import List, Optional

import numpy as np
from PyQt5.QtCore import QObject, pyqtSignal

from uwsift.model.layer_item import LayerItem

LOG = logging.getLogger(__name__)


# TODO: Add Policy protocol class to help with type checking and to define an interface


[docs] class WrappingDrivingPolicy(QObject): """ Time translation policy that when called returns a simulated data time by taking a timestamp from a designated data layer (the driving layer). Each successive call will return the driving layer's next timestamp. If there are no more timestamps in the driving layer it starts over from the first timestamp. """ # should be a LayerItem, but it could be None, too. It can be None if no driving layer could be chosen didUpdatePolicy = pyqtSignal(object) def __init__(self, layers: List[LayerItem]): super().__init__() self._layers: List[LayerItem] = layers self._driving_idx = 0 self._curr_t_sim = None self._timeline: Optional[List[datetime]] = None self._driving_layer: Optional[LayerItem] = None self._driving_layer_uuid = None def _get_dynamic_layers(self): return [layer for layer in self._layers if layer.dynamic] def _driving_layer_index_in_layers(self) -> Optional[int]: try: idx = self._get_dynamic_layers().index(self._driving_layer_uuid) return idx except ValueError: return None def _get_next_possible_driving_layer(self) -> Optional[LayerItem]: for layer in self._get_dynamic_layers(): return layer LOG.info("No suitable driving layer found!") return None
[docs] def on_layers_update(self): """ Slot connected to LayerModel's 'didUpdateLayers' signal. Takes the first loaded layer, of a suitable kind, if the old driving layer is not in LayerModel's layers anymore. """ driving_layer_index = self._driving_layer_index_in_layers() if driving_layer_index: return else: self.driving_layer = self._get_next_possible_driving_layer() self.didUpdatePolicy.emit(self.driving_layer)
[docs] def change_timebase(self, layer): self.driving_layer = layer self.didUpdatePolicy.emit(layer)
@property def timeline_length(self): return 0 if not self._timeline else len(self._timeline) @property def driving_layer(self): return self._driving_layer @driving_layer.setter def driving_layer(self, layer: LayerItem): if not layer or not layer.dynamic: self._driving_layer = None self._driving_idx = 0 self._timeline = None elif not self._driving_layer: self._driving_layer = layer self.timeline = list(self._driving_layer.timeline.keys()) self._driving_idx = 0 else: # Retrieve time step of new timeline analogous to previous # simulation time (stored in self._current_t_sim). self.timeline = list(layer.timeline.keys()) self._driving_layer = layer nearest_past_idx = self._find_nearest_past(self._curr_t_sim) if nearest_past_idx is not None: self._driving_idx = nearest_past_idx else: self._driving_idx = 0 self._curr_t_sim = None if not self.timeline else self.timeline[self._driving_idx] @property def timeline(self): return self._timeline @timeline.setter def timeline(self, timeline: List[datetime]): self._timeline = timeline def _find_nearest_past(self, tstamp: Optional[datetime]) -> Optional[int]: """ Upon driving layer change find the nearest past tstamp in the new driving layer and return its index. """ if tstamp is None: return None old_tstamp_np = np.asarray([tstamp]) other_timeline_np = np.asarray(self.timeline) past_idcs = other_timeline_np <= old_tstamp_np distances = np.abs(other_timeline_np[past_idcs] - old_tstamp_np) if distances.size > 0: return np.argmin(distances) else: return None
[docs] def curr_t_sim(self): if not self.timeline: assert self._curr_t_sim is None # nosec B101 else: assert self._curr_t_sim == self.timeline[self._driving_idx] # nosec B101 return self._curr_t_sim
[docs] def curr_timeline_index(self): return self._driving_idx
[docs] def jump_to_t_sim(self, index: int) -> datetime: """Returns t_sim by looking up the driving layer's timestamp at the provided index location. Raises an exception if being called with an invalid index. :param index: integer location in the timeline to jump to. :return: datetime object located at the provided index. """ try: self._driving_idx = index t_sim = self.timeline[self._driving_idx] except Exception as e: LOG.error(f"Invalid index passed to driving layer timeline: " f"index={index}\n", exc_info=True) raise e return t_sim
[docs] def compute_t_sim(self, tick_time: int, backwards: bool = False) -> Optional[datetime]: """ Returns timestamp t_sim by: 1) backwards set to False and index is None: incrementing the index of the current driving layer and returning the timestamp of the driving layer timeline at the index location 2) backwards set to True and index is None: decrementing the index of the current driving layer...(see 1) 3) backwards set to False but providing setting the index kwarg to a valid integer: looking up the driving layer's timestamp at the provided index location In case the timeline is empty (None or empty list) return the current time (in UTC). """ if backwards: self._driving_idx -= 1 if self._driving_idx < 0: self._driving_idx = self.timeline_length - 1 else: self._driving_idx += 1 if self._driving_idx >= self.timeline_length: self._driving_idx = 0 self._curr_t_sim = self.timeline[self._driving_idx] if self.timeline_length > 0 else datetime.utcnow() return self._curr_t_sim