#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
uwsift.model.document
---------------------
The document is an interface to further process some user interactions and delegate the import of new content to the
workspace. It also contains all metadata information of all loaded records.
The document handles the following tasks:
- import new files
- instruct the workspace to import new content
- create a Presentation using metadata information
- manage the currently active area definition used to present the data
- manage ser color maps
The communication between the document and other parts of the application are done with signal/slot connections.
Document has zero or more Colormaps, determining how they're presented
The document does not own data (content). It only owns metadata (info).
All entities in the Document have a UUID that is their identity throughout their lifecycle,
and is often used as shorthand between subsystems. Document rarely deals directly with content.
:author: R.K.Garcia <rayg@ssec.wisc.edu> and others
:copyright: 2015 by University of Wisconsin Regents, see AUTHORS for more details
:license: GPLv3, see LICENSE for more details
"""
from __future__ import annotations
__author__ = "rayg"
__docformat__ = "reStructuredText"
import json
import logging
import os
import typing as typ
from uuid import UUID
from PyQt5.QtCore import QObject, pyqtSignal
from uwsift.common import Info, Kind, Presentation
from uwsift.model.area_definitions_manager import AreaDefinitionsManager
from uwsift.queue import TASK_DOING, TASK_PROGRESS, TaskQueue
from uwsift.util.common import get_initial_gamma, units_conversion
from uwsift.util.default_paths import DOCUMENT_SETTINGS_DIR
from uwsift.view.colormap import (
COLORMAP_MANAGER,
SITE_CATEGORY,
USER_CATEGORY,
PyQtGraphColormap,
)
from uwsift.workspace import BaseWorkspace, CachingWorkspace, SimpleWorkspace
from uwsift.workspace.metadatabase import Product
LOG = logging.getLogger(__name__)
###################################################################################################################
[docs]
class Document(QObject): # base class is rightmost, mixins left of that
"""Storage for dataset info and user information.
This is the low-level "internal" interface that acts as a signaling hub.
Direct access to the document is being deprecated.
Most direct access patterns should be migrated to using a contextual view of the document,
in order to reduce abstraction leakage and permit the document storage to evolve.
"""
# signals
didAddDataset = pyqtSignal(dict, Presentation)
didUpdateBasicDataset = pyqtSignal(UUID, Kind)
didChangeProjection = pyqtSignal(str) # name of projection (area definition)
didReorderTracks = pyqtSignal(set, set) # added track names, removed track names
didUpdateUserColormap = pyqtSignal(str) # name of colormap which has an update
def __init__(
self,
workspace: BaseWorkspace,
queue: TaskQueue,
config_dir=DOCUMENT_SETTINGS_DIR,
**kwargs,
):
super(Document, self).__init__(**kwargs)
self.config_dir = config_dir
self.queue = queue
if not os.path.isdir(self.config_dir):
LOG.debug("Creating settings directory {}".format(self.config_dir))
os.makedirs(self.config_dir)
self._workspace = workspace
self._info_by_uuid: typ.Dict[UUID, dict] = {} # dict(uuid:frozendict)
self.colormaps = COLORMAP_MANAGER
self.default_area_def_name = AreaDefinitionsManager.default_area_def_name()
self.current_area_def_name = self.default_area_def_name
# Create directory if it does not exist
cmap_base_dir = os.path.join(self.config_dir, "colormaps")
read_cmap_dir = os.path.join(cmap_base_dir, "site") # read-only
write_cmap_dir = os.path.join(cmap_base_dir, "user") # writeable
self.read_cmap_dir = read_cmap_dir
self.write_cmap_dir = write_cmap_dir
importable_cmap_cats = [(True, SITE_CATEGORY, read_cmap_dir), (False, USER_CATEGORY, write_cmap_dir)]
for read_only, cmap_cat, cmap_dir in importable_cmap_cats:
if not os.path.exists(cmap_dir):
os.makedirs(cmap_dir)
else:
self.colormaps.import_colormaps(cmap_dir, read_only=read_only, category=cmap_cat)
[docs]
def find_colormap(self, colormap):
if isinstance(colormap, str) and colormap in self.colormaps:
colormap = self.colormaps[colormap]
return colormap
[docs]
def area_definition(self, area_definition_name=None):
return AreaDefinitionsManager.area_def_by_name(area_definition_name or self.current_area_def_name)
[docs]
def change_projection(self, area_def_name=None):
if area_def_name is None:
area_def_name = self.default_area_def_name
assert area_def_name in AreaDefinitionsManager.available_area_def_names() # nosec B101
if area_def_name != self.current_area_def_name:
LOG.debug(
f"Changing projection (area definition) from" f" '{self.current_area_def_name}' to '{area_def_name}'"
)
self.current_area_def_name = area_def_name
self.didChangeProjection.emit(self.current_area_def_name)
[docs]
def update_user_colormap(self, colormap, name):
# Update new gradient into save location
try:
filepath = self.write_cmap_dir
cmap_file = open(os.path.join(filepath, name + ".json"), "w")
cmap_file.write(json.dumps(colormap, indent=2, sort_keys=True))
cmap_file.close()
except IOError:
LOG.error("Error saving gradient: {}".format(name), exc_info=True)
cmap = PyQtGraphColormap(colormap)
self.colormaps[name] = cmap
# Update live map
self.didUpdateUserColormap.emit(name)
[docs]
def remove_user_colormap(self, name):
try:
os.remove(os.path.join(self.config_dir, "colormaps", "user", name + ".json"))
except OSError:
pass
del self.colormaps[name]
[docs]
def current_projection_index(self):
return list(AreaDefinitionsManager.available_area_def_names()).index(self.current_area_def_name)
[docs]
def change_projection_index(self, idx):
return self.change_projection(tuple(AreaDefinitionsManager.available_area_def_names())[idx])
def _insert_dataset_with_info(self, info: dict, cmap=None, style=None, insert_before=0):
"""
insert a dataset into the presentations but do not signal
:return: new Presentation tuple, new reordered indices tuple
"""
if cmap is None:
cmap = info.get(Info.COLORMAP)
if style is None:
style = info.get(Info.STYLE)
gamma = get_initial_gamma(info)
climits = self._workspace.get_range_for_dataset_no_fail(info)
p = Presentation(
uuid=info[Info.UUID],
kind=info[Info.KIND],
visible=True,
colormap=cmap,
style=style,
climits=climits,
gamma=gamma,
opacity=1.0,
)
return p
[docs]
def activate_product_uuid_as_new_dataset(self, uuid: UUID, insert_before=0, **importer_kwargs):
if uuid in self._info_by_uuid:
LOG.debug("dataset already loaded: {}".format(uuid))
self._workspace.import_product_content(uuid, **importer_kwargs)
return
# FUTURE: Load this async, the slots for the below signal need to be OK
# with that
self._workspace.import_product_content(uuid, **importer_kwargs)
# updated metadata with content information (most importantly navigation
# information)
frozen_info = self._workspace.get_info(uuid)
assert frozen_info is not None # nosec B101
info = dict(frozen_info) # make a copy to which stuff can be added
LOG.debug("cell_width: {}".format(repr(info[Info.CELL_WIDTH])))
LOG.debug("new dataset info: {}".format(repr(info)))
self._info_by_uuid[uuid] = info
if Info.UNIT_CONVERSION not in info:
info[Info.UNIT_CONVERSION] = units_conversion(info)
if Info.FAMILY not in info:
info[Info.FAMILY] = self._family_for_product_or_info(info)
presentation = self._insert_dataset_with_info(info, insert_before=insert_before)
# signal updates from the document
self.didAddDataset.emit(info, presentation)
def _family_for_product_or_info(self, uuid_or_info):
if isinstance(uuid_or_info, UUID):
if isinstance(self._workspace, CachingWorkspace):
with self._workspace.metadatabase as s:
fam = s.query(Product.family).filter_by(uuid_str=str(uuid_or_info)).first()
if isinstance(self._workspace, SimpleWorkspace):
fam = self._workspace.get_info(uuid_or_info)[Info.FAMILY]
if fam:
return fam[0]
uuid_or_info = self[uuid_or_info]
if Info.FAMILY in uuid_or_info:
LOG.debug("using pre-existing family {}".format(uuid_or_info[Info.FAMILY]))
return uuid_or_info[Info.FAMILY]
# kind:pointofreference:measurement:wavelength
kind = uuid_or_info[Info.KIND]
refpoint = "unknown" # FUTURE: geo/leo
measurement = uuid_or_info.get(Info.STANDARD_NAME)
if uuid_or_info.get("recipe"):
# RGB
subcat = uuid_or_info["recipe"].name
elif uuid_or_info.get(Info.CENTRAL_WAVELENGTH):
# basic band
subcat = uuid_or_info[Info.CENTRAL_WAVELENGTH]
else:
# higher level product or algebraic dataset
subcat = uuid_or_info[Info.DATASET_NAME]
return "{}:{}:{}:{}".format(kind.name, refpoint, measurement, subcat)
[docs]
def import_files(self, paths, insert_before=0, **importer_kwargs) -> typ.Generator[dict, None, None]:
"""Load product metadata and content from provided file paths.
:param paths: paths to open
:param insert_before: where to insert them in layer manager
:return:
"""
# NOTE: if the importer argument 'merge_with_existing' is not set it
# defaults to True here.
# TODO(AR) make 'merge_with_existing' an explicit argument to this
# method.
do_merge_with_existing = importer_kwargs.get("merge_with_existing", True) and not importer_kwargs.get(
"resampling_info"
)
# Ensure that the result of the test just performed is consistently
# passed on to further import steps via importer_kwargs:
importer_kwargs["merge_with_existing"] = do_merge_with_existing
# Load all the metadata so we can sort the files
# assume metadata collection is in the most user-friendly order
infos = self._workspace.collect_product_metadata_for_paths(paths, **importer_kwargs)
uuids = []
merge_target_uuids = {} # map new files uuids to merge target uuids
total_products = 0
for dex, (num_prods, info) in enumerate(infos):
uuid = info[Info.UUID]
merge_target_uuid = uuid
if do_merge_with_existing:
# real_paths because for satpy imports the methods paths parameter actually
# contains the reader names
real_paths = info["paths"]
merge_target = self._workspace.find_merge_target(uuid, real_paths, info)
if merge_target:
merge_target_uuid = merge_target.uuid
yield {
TASK_DOING: "Collecting metadata {}/{}".format(dex + 1, num_prods),
TASK_PROGRESS: float(dex + 1) / float(num_prods),
"uuid": merge_target_uuid,
"num_products": num_prods,
}
# redundant but also more explicit than depending on num_prods
total_products = num_prods
uuids.append(uuid)
merge_target_uuids[uuid] = merge_target_uuid
if not total_products:
raise ValueError("no products available in {}".format(paths))
# collect product and resource information but don't yet import content
for dex, uuid in enumerate(uuids):
merge_target_uuid = merge_target_uuids[uuid]
if do_merge_with_existing and uuid != merge_target_uuid: # merge products
active_content_data = self._workspace.import_product_content(
uuid, merge_target_uuid=merge_target_uuid, **importer_kwargs
)
# active_content_data is none if all segments are already loaded
# and there is nothing new to import
if active_content_data:
dataset_info = self[merge_target_uuid]
self.didUpdateBasicDataset.emit(merge_target_uuid, dataset_info[Info.KIND])
elif uuid in self._info_by_uuid:
LOG.warning("dataset with UUID {} already in document?".format(uuid))
self._workspace.get_content(uuid)
else:
self.activate_product_uuid_as_new_dataset(uuid, insert_before=insert_before, **importer_kwargs)
yield {
TASK_DOING: "Loading content {}/{}".format(dex + 1, total_products),
TASK_PROGRESS: float(dex + 1) / float(total_products),
"uuid": merge_target_uuid,
"num_products": total_products,
}
[docs]
def sort_product_uuids(self, uuids: typ.Iterable[UUID]) -> typ.List[UUID]:
assert isinstance(self._workspace, CachingWorkspace) # nosec B101
uuidset = set(str(x) for x in uuids)
if not uuidset:
return []
with self._workspace.metadatabase as S:
zult = [
(x.uuid, x.ident)
for x in S.query(Product)
.filter(Product.uuid_str.in_(uuidset))
.order_by(Product.family, Product.category, Product.serial)
.all()
]
LOG.debug("sorted products: {}".format(repr(zult)))
return [u for u, _ in zult]
# TODO: add a document style guide which says how different bands from different instruments are displayed
[docs]
def get_uuids(self):
return list(self._info_by_uuid.keys())
def __getitem__(self, dataset_uuid):
"""
return dataset info with the given UUID
"""
if dataset_uuid is None:
raise KeyError("Key 'None' does not exist in document or workspace")
elif not isinstance(dataset_uuid, UUID):
raise ValueError("document[UUID] required, %r was used" % type(dataset_uuid))
if dataset_uuid in self._info_by_uuid:
return self._info_by_uuid[dataset_uuid]
# check the workspace for information
try:
LOG.debug("Checking workspace for information on inactive product")
info = self._workspace.get_info(dataset_uuid)
except KeyError:
info = None
if info is None:
raise KeyError("Key '{}' does not exist in document or workspace".format(dataset_uuid))
return info
[docs]
def remove_dataset_info(self, uuid: UUID):
"""Remove the info of a dataset because it is no longer needed
:param uuid: UUID of the dataset which is removed
"""
LOG.debug(f"Remove dataset info of uuid {uuid}")
self._info_by_uuid.pop(uuid, None)