import logging
import os
from collections import ChainMap
from datetime import datetime
from typing import Generator, Mapping, Optional, Tuple
from uuid import UUID
import numpy as np
import satpy.readers
from uwsift import CLEANUP_FILE_CACHE, config
from uwsift.common import Info, Kind, State
from .importer import SatpyImporter, aImporter
from .metadatabase import Content, ContentImage, Product
from .workspace import ActiveContent, BaseWorkspace, frozendict
LOG = logging.getLogger(__name__)
[docs]
class SimpleWorkspace(BaseWorkspace):
"""
Data management object for monitoring use case.
Unlike CachingWorkspace SimpleWorkspace has no database where the
datasets are saved. So every dataset which is loaded is only available while
the software is running.
SimpleWorkspace shall work with Datasets.
SimpleWorkspace have one dictionary for saving the Product objects and
one dictionary for saving the Content objects for a specific UUID.
"""
def __init__(self, directory_path: str):
super(SimpleWorkspace, self).__init__(directory_path)
self.products: dict = {}
self.contents: dict = {}
self._available: dict = {}
self.remove_content_data_from_cache_dir_checked()
@property
def _S(self):
return None
[docs]
def clear_workspace_content(self):
"""Remove binary files from workspace and workspace database."""
LOG.info("Clearing workspace contents...")
self.contents = {}
self.products = {}
#
# data array handling
#
def _activate_content(self, c: Content) -> ActiveContent:
self._available[c.uuid] = zult = ActiveContent(self.cache_dir, c, self.get_info(c.uuid))
c.touch()
c.product.touch()
self.remove_content_data_from_cache_dir_checked(c.uuid)
return zult
def _cached_arrays_for_content(self, c: Content):
"""
attach cached data indicated in Content, unless it's been attached already and is in _available
touch the content and product in the database to appease the LRU gods
:param c: metadatabase Content object for session attached to current thread
:return: workspace_content_arrays
"""
cache_entry = self._available.get(c.uuid)
return cache_entry or self._activate_content(c)
# FIXME: Use code from CachingWorkspace._remove_content_files_from_workspace?
[docs]
def remove_content_data_from_cache_dir_checked(self, uuid: Optional[UUID] = None):
"""Check whether the numpy.memmap cache files are to be deleted. If yes, then either all existing cache files
will be deleted or only the cache files with the specified uuid will be deleted.
If a PermissionError occurs, the file that triggered this error is skipped.
"""
if CLEANUP_FILE_CACHE:
for file in os.listdir(self.cache_dir):
try:
if uuid is not None:
if file.startswith(str(uuid)):
os.remove(os.path.join(self.cache_dir, file))
else:
os.remove(os.path.join(self.cache_dir, file))
except PermissionError as e:
LOG.debug(f"Can't delete numpy memmap cache file {file}: {e}")
#
# often-used queries
#
def _product_with_uuid(self, session, uuid: UUID) -> Optional[Product]:
return self.products.get(uuid, None)
def _product_overview_content(
self, session, prod: Optional[Product] = None, uuid: Optional[UUID] = None, kind: Kind = Kind.IMAGE
) -> Optional[Content]:
return self.contents.get(uuid, None)
def _product_native_content(
self, session, prod: Optional[Product] = None, uuid: Optional[UUID] = None, kind: Kind = Kind.IMAGE
) -> Optional[Content]:
return self.contents.get(uuid, None)
#
# combining queries with data content
#
def _overview_content_for_uuid(self, uuid: UUID, kind: Kind = Kind.IMAGE) -> np.memmap:
ovc = self._product_overview_content(None, uuid=uuid, kind=kind)
assert ovc is not None # nosec B101
arrays = self._cached_arrays_for_content(ovc)
return arrays.data
#
# workspace file management
#
@property
def _total_workspace_bytes(self):
return None
def _all_product_uuids(self) -> list:
return [self.products[p] for p in self.products]
[docs]
def get_info(self, info_or_uuid, lod=None) -> Optional[frozendict]:
"""
Get the metadata dictionary for the Product referenced by info_or_uuid.
:param info_or_uuid: existing dataset info dictionary containing a UUID, or the UUID directly
:param lod: desired level of detail to focus
:return: metadata access with mapping semantics, to be treated as read-only
"""
# FUTURE deprecate this
if isinstance(info_or_uuid, str):
uuid = UUID(info_or_uuid)
elif not isinstance(info_or_uuid, UUID):
uuid = info_or_uuid[Info.UUID]
else:
uuid = info_or_uuid
prod = self._product_with_uuid(None, uuid)
if not prod: # then it hasn't had its metadata scraped
LOG.error("no info available for UUID {}".format(info_or_uuid))
LOG.error("known products: {}".format(repr(self._all_product_uuids())))
return None
kind = prod.info[Info.KIND]
native_content = self._product_native_content(None, prod=prod, uuid=uuid, kind=kind)
if native_content is not None:
# FUTURE: this is especially saddening; upgrade to finer grained
# query and/or deprecate .get_info
# once upon a time...
# our old model was that product == content and shares a UUID with
# the dataset if content is available, we want to provide native
# content metadata along with the product metadata
# specifically a lot of client code assumes that resource
# == product == content and
# that singular navigation (e.g. cell_size) is norm
# FIXME DEBUG <- since commit 3576ff0122bd534f83422ce19479d40b7dc9e5b0
assert kind in [Kind.LINES, Kind.POINTS] or native_content.info[Info.CELL_WIDTH] is not None # nosec B101
return frozendict(ChainMap(native_content.info, prod.info))
# mapping semantics for database fields, as well as key-value fields;
# flatten to one namespace and read-only
return frozendict(prod.info)
[docs]
def purge_content_for_product_uuids(self, uuids: list, also_products=False):
pass
[docs]
def bgnd_task_complete(self):
pass
[docs]
def import_product_content(
self,
uuid: UUID,
prod: Optional[Product] = None,
allow_cache=True,
merge_target_uuid: Optional[UUID] = None,
**importer_kwargs,
) -> np.memmap:
if prod is None and uuid is not None:
prod = self._product_with_uuid(None, uuid)
assert prod # nosec B101 # suppress mypy [union-attr]
if merge_target_uuid:
merge_target = self._product_with_uuid(None, merge_target_uuid)
importer_kwargs["merge_target"] = merge_target
self.products.pop(uuid, None)
else:
importer_kwargs["merge_target"] = None
self.set_product_state_flag(prod.uuid, State.ARRIVING)
default_prod_kind = prod.info[Info.KIND]
if merge_target_uuid and len(prod.content):
LOG.info("product already has content available, using that " "rather than re-importing")
ovc = self._product_overview_content(None, uuid=uuid, kind=default_prod_kind)
assert ovc is not None # nosec B101
arrays = self._cached_arrays_for_content(ovc)
return arrays.data
truck = aImporter.from_product(prod, workspace_cwd=self.cache_dir, database_session=None, **importer_kwargs)
if not truck:
# aImporter.from_product() didn't return an Importer instance
# since all files represent data granules, which are already
# loaded and merged into existing datasets.
# Thus: nothing to do.
return None
metadata = prod.info
name = metadata[Info.SHORT_NAME]
gen = truck.begin_import_products(prod)
nupd = 0
for update in gen:
nupd += 1
# we're now incrementally reading the input file
# data updates are coming back to us (eventually asynchronously)
# Content is in the metadatabase and being updated + committed,
# including sparsity and coverage arrays
if update.data is not None:
# data = update.data
LOG.info("{} {}: {:.01f}%".format(name, update.stage_desc, update.completion * 100.0))
if update.content is not None:
self.contents[update.uuid] = update.content
LOG.debug("received {} updates during import".format(nupd))
self._clear_product_state_flag(prod.uuid, State.ARRIVING)
# make an ActiveContent object from the Content, now that we've imported it
ac = self._overview_content_for_uuid(
merge_target_uuid if merge_target_uuid else prod.uuid, kind=default_prod_kind
)
# Issue #415: When done with the importer processing we should release the importer resources if applicable.
truck.release_resources()
if ac is None:
return None
return ac.data
[docs]
def find_merge_target(self, uuid: UUID, paths, info) -> Optional[Product]:
"""
Try to find an existing product where the to-be-imported files could
be merged into.
:param uuid: uuid of the product which is about to be imported and
might be merged with an existing product
:param paths: the paths which should be imported or merged
:param info: metadata for the to-be-imported product
:return: the existing product to merge new content into or None if no
existing product is compatible
"""
reader = info["reader"]
group_keys = config.get(f"data_reading.{reader}.group_keys", None)
for existing_prod in self.products.values():
# exclude all products which are incomplete (products which are imported right now)
# and products with different kind or parameter
# TODO: when loading granules without resampling after granules
# of same FAMILY have been loaded *with* resampling already,
# merging must be prevented.
# As an attempt to achieve this the SHAPE size comparison checks,
# whether they are compatible for array broadcasting, but that is
# only a necessary, but not sufficient condition.
# Idea: modify the FAMILY info by adding grid information?
if (
not existing_prod.content
or reader != existing_prod.info["reader"]
or info[Info.SHAPE][0] > existing_prod.info[Info.SHAPE][0]
or info[Info.SHAPE][1] != existing_prod.info[Info.SHAPE][1]
or info[Info.FAMILY] != existing_prod.info[Info.FAMILY]
):
continue
# if to-be-imported product seem to be compatible with an existing product check
# if satpy would group together the to-be-imported files and the already loaded files in
# the existing merge candidate
all_files = set(existing_prod.content[0].source_files) if existing_prod.content[0] else set()
all_files |= set(paths)
grouped_files = satpy.readers.group_files(all_files, reader=reader, group_keys=group_keys)
if (
len(grouped_files) == 1
and len(grouped_files[0]) == 1
and reader in grouped_files[0]
and len(all_files) == len(grouped_files[0][reader])
):
return existing_prod
return None
def _create_product_from_array(
self, info: Mapping, data, namespace=None, codeblock=None
) -> Tuple[UUID, Optional[frozendict], np.memmap]:
"""
Puts created image array into resp. data structures within workspace and returns
uuid, updated info, as well as the memmap of the created array.
Side effects include:
Write np.memmap to disk for later retrieval by workspace. Also updates metadata Product
object by path to .image memmap file and adds created Content to workspace's `contents`.
Finally, imports product into workspace.
Args:
info: mapping of key-value metadata for new product
data: ndarray with content to store, typically 2D float32
namespace: {variable: uuid, } for calculation of this data
codeblock: text, code to run to recalculate this data within namespace
Returns:
uuid, info, data: uuid of the new product, its official read-only metadata, and cached
content ndarray
"""
if Info.UUID not in info:
raise ValueError("currently require an Info.UUID be included in product")
parms = dict(info)
now = datetime.utcnow()
parms.update(
dict(
atime=now,
mtime=now,
)
)
P = Product.from_info(parms, symbols=namespace, codeblock=codeblock)
uuid = P.uuid
# FUTURE: add expression and namespace information, which would require additional parameters
ws_filename = "{}.image".format(str(uuid))
ws_path = os.path.join(self.cache_dir, ws_filename)
# Write memmap to disk, for later reference by workspace
with open(ws_path, "wb+") as fp:
mm = np.memmap(fp, dtype=data.dtype, shape=data.shape, mode="w+")
mm[:] = data[:]
# Update metadata to contain path to cached memmap .image file
parms.update(
dict(
lod=ContentImage.LOD_OVERVIEW,
path=ws_filename,
dtype=str(data.dtype),
proj4=info[Info.PROJ],
resolution=min(info[Info.CELL_WIDTH], info[Info.CELL_HEIGHT]),
)
)
rcls = dict(zip(("rows", "cols", "levels"), data.shape))
parms.update(rcls)
LOG.debug("about to create Content with this: {}".format(repr(parms)))
C = ContentImage.from_info(parms, only_fields=True)
P.content.append(C)
self.contents[uuid] = C
self.products[uuid] = P
# activate the content we just loaded into the workspace
overview_data = self._overview_content_for_uuid(uuid)
return uuid, self.get_info(uuid), overview_data
def _bgnd_remove(self, uuid: UUID):
from uwsift.queue import TASK_DOING, TASK_PROGRESS
yield {TASK_DOING: "purging memory", TASK_PROGRESS: 0.5}
LOG.debug(f"Products before deletion: {list(self.products.keys())}")
LOG.debug(f"Contents before deletion: {list(self.contents.keys())}")
LOG.debug(f"Active Content before deletion: {list(self._available.keys())}")
self._deactivate_content_for_product(self._product_with_uuid(None, uuid))
self.contents.pop(uuid, None)
self.products.pop(uuid, None)
LOG.debug(f"Products after deletion: {list(self.products.keys())}")
LOG.debug(f"Contents after deletion: {list(self.contents.keys())}")
LOG.debug(f"Active Content after deletion: {list(self._available.keys())}")
yield {TASK_DOING: "purging memory", TASK_PROGRESS: 1.0}
[docs]
def get_content(self, info_or_uuid, lod=None, kind: Kind = Kind.IMAGE) -> Optional[np.memmap]:
"""
By default, get the best-available (closest to native)
np.ndarray-compatible view of the full dataset
:param info_or_uuid: existing datasetinfo dictionary, or its UUID
:param lod: desired level of detail to focus (0 for overview)
:param kind: kind of the data referenced by info_or_uuid
:return:
"""
if info_or_uuid is None:
return None
elif isinstance(info_or_uuid, UUID):
uuid = info_or_uuid
elif isinstance(info_or_uuid, str):
uuid = UUID(info_or_uuid)
else:
uuid = info_or_uuid[Info.UUID]
# The current implementation, where self.contents is a dict, cannot
# support multiple contents per Product/UUID which seems to be prepared
# in the original implementation, now found in
# CachingWorkspace.get_content().
# TODO(FUTURE): Update this in case CachingWorkspace is dropped and/or
# multiple contents per Product/UUID are implemented, see below.
content = self.contents.get(uuid)
content = content if content and content.info.get(Info.KIND, Kind.IMAGE) == kind else None
if content is None:
raise AssertionError("no content in workspace for {}, must re-import".format(uuid))
# FIXME: find the content for the requested LOD, then return its
# ActiveContent - or attach one
# for now, just work with assumption of one product one content
active_content = self._cached_arrays_for_content(content)
return active_content.data
def _deactivate_content_for_product(self, p: Optional[Product]):
if p is None:
return
for c in p.content:
self._available.pop(c.uuid, None)
def _get_active_content_by_uuid(self, uuid: UUID) -> Optional[ActiveContent]:
return self._available.get(uuid)