Source code for uwsift.control.auto_update

import logging
from copy import deepcopy
from typing import Callable, Dict, List, Optional, Tuple
from uuid import UUID

from vispy import app

from uwsift import config
from uwsift.model.catalogue import Catalogue
from uwsift.queue import TASK_DOING, TASK_PROGRESS, TheQueue

LOG = logging.getLogger(__name__)


[docs] class StartTimeGranuleUpdatePolicy: def __init__(self, query_catalogue_for_satpy_importer_args: Callable): self._query_catalogue_for_satpy_importer_args = query_catalogue_for_satpy_importer_args self._last_scene_files = None # Check scenes list returned from catalogue # Compare last found scene with scene loaded in the previous update, # return None, if nothing has changed
[docs] def update(self, current_constraints: Dict) -> Optional[Tuple]: """ :param current_constraints: dictionary as provided by catalogue_settings.yaml. :returns: None if no new satpy scenes are available from the configured search dir or a tuple consisting of the reader: List and importer_kwargs: Dict needed to load data into SIFT. """ reader_scenes_ds_ids, readers = self._query_catalogue_for_satpy_importer_args(current_constraints) if not reader_scenes_ds_ids or not readers: return None sorted_scenes_dict = sorted(reader_scenes_ds_ids["scenes"].items(), key=lambda item: item[1].start_time) if not sorted_scenes_dict: return None most_recent_scene_item = sorted_scenes_dict.pop() if most_recent_scene_item[0] != self._last_scene_files: self._last_scene_files = deepcopy(most_recent_scene_item[0]) importer_kwargs = { "reader": reader_scenes_ds_ids["reader"], "scenes": {most_recent_scene_item[0]: most_recent_scene_item[1]}, "dataset_ids": reader_scenes_ds_ids["dataset_ids"], } readers_to_use = [readers[0]] return readers_to_use, importer_kwargs return None
[docs] class AutoUpdateManager: def __init__(self, window, minimum_interval, search_path=None): # "Static" self.search_path = search_path self.reader = None self.filter_patterns = None self.group_keys = None self.products = None self._window = window # "Dynamically patched" self.filter = None # update datetime to "now" self.timer = None # State self._files_loaded = None self._old_uuids = [] self._init_catalogue() def update_in_background(event): TheQueue.add("auto update", self.update(), None) # Set up auto update mode timer, with minimum waiting time between # update cycles. # Minimum is exceeded if data loading time exceeds minimum_wait time. # Timer is paused until end of data loading to account for this. self.timer = app.Timer(minimum_interval, connect=update_in_background) self.timer.start() self._auto_update_policy = StartTimeGranuleUpdatePolicy(self._query_for_satpy_importer_kwargs_and_readers) def _query_for_satpy_importer_kwargs_and_readers(self, current_constraints): return Catalogue.query_for_satpy_importer_kwargs_and_readers( self.reader, self.search_path, self.filter_patterns, self.group_keys, current_constraints, self.products ) def _init_catalogue(self): catalogue_config = config.get("catalogue", None) try: first_query = catalogue_config[0] except (TypeError, IndexError): msg = "No catalogue query found in configuration." raise RuntimeError(msg) ( self.reader, self.search_path, self.filter_patterns, self.group_keys, self.filter, self.products, ) = Catalogue.extract_query_parameters(first_query)
[docs] def on_loading_done(self, uuids: List[UUID]): # The time-consuming stuff is done, let's already start the next round self.timer.start() # Only upon completion of data loading allow for removal of old data. uuids_to_keep = set(uuids) uuids_to_remove = list(set(self._old_uuids) - uuids_to_keep) self._window.layer_model.remove_datasets_from_all_layers(uuids_to_remove) self._old_uuids.clear()
[docs] def update(self): """ Called by self.timer's tick """ yield {TASK_DOING: "Run auto update", TASK_PROGRESS: 0.0} # remove all but the "newest" (with respect to 'start_time') scene from # importer_kwargs and according reader entries in readers_to_use readers_importer_tup = self._auto_update_policy.update(self.filter) if readers_importer_tup is not None: files_to_load, importer_kwargs = readers_importer_tup self._old_uuids = self._window.document.get_uuids() # Stop timer to prohibit slow data loading to transition directly to # deletion of just loaded or still loading data self.timer.stop() self._window.open_paths(files_to_load, **importer_kwargs) yield {TASK_DOING: "Auto update finished", TASK_PROGRESS: 1.0}