import logging
import os
import shutil
from collections import OrderedDict
from datetime import datetime
from typing import Dict, Generator, Mapping, Optional, Tuple
from uuid import UUID
import numpy as np
from sqlalchemy.orm.exc import NoResultFound
from uwsift.common import Info, Kind, State
from uwsift.queue import TASK_DOING, TASK_PROGRESS
from .importer import SatpyImporter, aImporter
from .metadatabase import Content, ContentImage, Metadatabase, Product, Resource
from .workspace import ActiveContent, BaseWorkspace, frozendict
LOG = logging.getLogger(__name__)
DEFAULT_WORKSPACE_SIZE = 256
MIN_WORKSPACE_SIZE = 8
IMPORT_CLASSES = [SatpyImporter]
# first instance is main singleton instance; don't preclude the possibility of importing from another workspace later on
TheWorkspace = None
[docs]
class CachingWorkspace(BaseWorkspace):
"""Data management and cache object.
CachingWorkspace is a singleton object which works with Datasets shall:
- own a working directory full of recently used datasets
- provide DatasetInfo dictionaries for shorthand use between application subsystems
- datasetinfo dictionaries are ordinary python dictionaries containing [Info.UUID],
projection metadata, LOD info
- identify datasets primarily with a UUID object which tracks the dataset and
its various representations through the system
- unpack data in "packing crate" formats like NetCDF into memory-compatible flat files
- efficiently create on-demand subsections and strides of raster data as numpy arrays
- incrementally cache often-used subsections and strides ("image pyramid") using appropriate tools like gdal
- notify subscribers of changes to datasets (Qt signal/slot pub-sub)
- during idle, clean out unused/idle data content, given DatasetInfo contents provides enough metadata to recreate
- interface to external data processing or loading plug-ins and notify application of new-dataset-in-workspace
"""
@property
def _S(self):
"""
use scoped_session registry of metadatabase to provide thread-local session object.
ref http://docs.sqlalchemy.org/en/latest/orm/contextual.html
Returns:
"""
return self._inventory.SessionRegistry
@property
def metadatabase(self) -> Metadatabase:
return self._inventory
def __init__(
self,
directory_path: str,
process_pool=None,
max_size_gb=DEFAULT_WORKSPACE_SIZE,
queue=None,
initial_clear=False,
):
super(
CachingWorkspace,
self,
).__init__(directory_path, queue=queue)
self._max_size_gb = max_size_gb # maximum size in gigabytes of flat files we cache in the workspace
if self._max_size_gb < MIN_WORKSPACE_SIZE:
self._max_size_gb = MIN_WORKSPACE_SIZE
LOG.warning("setting workspace size to %dGB" % self._max_size_gb)
if directory_path is None:
# a directory name was not given, we need a temporary directory
import tempfile
self._tempdir = tempfile.TemporaryDirectory()
directory_path = str(self._tempdir)
LOG.info("using temporary directory {}".format(directory_path))
# filename to store and load inventory information (simple cache)
self._inventory_path = os.path.join(self.cwd, "_inventory.db")
if initial_clear:
self.clear_workspace_content()
else:
LOG.info("attaching pre-existing workspace at {}".format(directory_path))
self._own_cwd = False
self._init_inventory_existing_datasets()
def _init_create_workspace(self):
"""
initialize a previously empty workspace
:return:
"""
should_init = not os.path.exists(self._inventory_path)
dn, fn = os.path.split(self._inventory_path)
if not os.path.isdir(dn):
raise EnvironmentError("workspace directory {} does not exist".format(dn))
LOG.info("{} database at {}".format("initializing" if should_init else "attaching", self._inventory_path))
# metadatabase instance, sqlalchemy:
self._inventory = Metadatabase("sqlite:///" + self._inventory_path, create_tables=should_init)
if should_init:
with self._inventory as s:
assert 0 == s.query(Content).count() # nosec B101
LOG.info("done with init")
[docs]
def clear_workspace_content(self):
"""Remove binary files from workspace and workspace database."""
LOG.info("Clearing workspace contents...")
try:
os.remove(self._inventory_path)
except FileNotFoundError:
LOG.debug("No inventory DB file found to remove: {}".format(self._inventory_path))
try:
shutil.rmtree(self.cache_dir)
except OSError:
LOG.debug("No binary cache directory found to remove: {}".format(self.cache_dir))
def _purge_missing_content(self):
"""
remove Content entries that no longer correspond to files in the cache directory
"""
LOG.debug("purging Content no longer available in the cache")
to_purge = []
with self._inventory as s:
for c in s.query(Content).all():
if not ActiveContent.can_attach(self.cache_dir, c):
LOG.warning("purging missing content {}".format(c.path))
to_purge.append(c)
LOG.debug(
"{} content entities no longer present in cache - will remove from database".format(len(to_purge))
)
for c in to_purge:
try:
c.product.content.remove(c)
except AttributeError:
# no_product
LOG.warning("orphaned content {}??, removing".format(c.path))
s.delete(c)
def _purge_inaccessible_resources(self):
"""
remove Resources that are no longer accessible
"""
LOG.debug("purging any resources that are no longer accessible")
with self._inventory as s:
resall = list(s.query(Resource).all())
n_purged = 0
for r in resall:
if not r.exists():
LOG.info("resource {} no longer exists, purging from database")
n_purged += 1
s.delete(r)
LOG.info("discarded metadata for {} orphaned resources".format(n_purged))
def _purge_orphan_products(self):
"""
remove products from database that have no cached Content, and no Resource we can re-import from
"""
LOG.debug(
"purging Products no longer recoverable by re-importing from Resources, "
"and having no Content representation in cache"
)
with self._inventory as s:
n_purged = 0
prodall = list(s.query(Product).all()) # SIFT/sift#180, avoid locking database too long
for p in prodall:
if len(p.content) == 0 and len(p.resource) == 0:
n_purged += 1
s.delete(p)
LOG.info("discarded metadata for {} orphaned products".format(n_purged))
def _migrate_metadata(self):
"""Replace legacy metadata uses with new uses."""
def _bgnd_startup_purge(self):
ntot = 5
n = 1
yield {TASK_DOING: "DB pruning cache entries", TASK_PROGRESS: float(n) / float(ntot)}
self._purge_missing_content()
n += 1
yield {TASK_DOING: "DB pruning stale resources", TASK_PROGRESS: float(n) / float(ntot)}
self._purge_inaccessible_resources()
n += 1
yield {TASK_DOING: "DB pruning orphan products", TASK_PROGRESS: float(n) / float(ntot)}
self._purge_orphan_products()
n += 1
yield {TASK_DOING: "DB migrating metadata", TASK_PROGRESS: float(n) / float(ntot)}
self._migrate_metadata()
n += 1
yield {TASK_DOING: "DB ready", TASK_PROGRESS: float(n) / float(ntot)}
def _init_inventory_existing_datasets(self):
"""
Do an inventory of an pre-existing workspace
FIXME: go through and check that everything in the workspace makes sense
FIXME: check workspace subdirectories for helper sockets and mmaps
:return:
"""
# attach the database, creating it if needed
self._init_create_workspace()
for _ in self._bgnd_startup_purge():
# SIFT/sift#180 -- background thread of lengthy database operations can cause lock failure in pysqlite
pass
#
# data array handling
#
def _remove_content_files_from_workspace(self, c: Content):
total = 0
for filename in [c.path, c.coverage_path, c.sparsity_path]:
if not filename:
continue
pn = os.path.join(self.cache_dir, filename)
if os.path.exists(pn):
LOG.debug("removing {}".format(pn))
total += os.stat(pn).st_size
try:
os.remove(pn)
except FileNotFoundError:
LOG.warning("could not remove {} - file not found; continuing".format(pn))
return total
def _activate_content(self, c: Content) -> ActiveContent:
self._available[c.id] = zult = ActiveContent(self.cache_dir, c, self.get_info(c.uuid))
c.touch()
c.product.touch()
return zult
def _cached_arrays_for_content(self, c: Content):
"""
attach cached data indicated in Content, unless it's been attached already and is in _available
touch the content and product in the database to appease the LRU gods
:param c: metadatabase Content object for session attached to current thread
:return: workspace_content_arrays
"""
cache_entry = self._available.get(c.id)
return cache_entry or self._activate_content(c)
#
# often-used queries
#
def _product_with_uuid(self, session, uuid: UUID) -> Product:
return session.query(Product).filter_by(uuid_str=str(uuid)).first()
def _product_overview_content(
self, session, prod: Optional[Product] = None, uuid: Optional[UUID] = None, kind: Kind = Kind.IMAGE
) -> Optional[Content]:
if prod is None and uuid is not None:
# Get Product object
try:
prod = session.query(Product).filter(Product.uuid_str == str(uuid)).one()
except NoResultFound:
LOG.error("No product with UUID {} found".format(uuid))
return None
assert prod # nosec B101 # suppress mypy [union-attr]
if kind == Kind.IMAGE:
contents = (
session.query(ContentImage).filter(ContentImage.product_id == prod.id).order_by(ContentImage.lod).all()
)
else:
contents = session.query(Content).filter(Content.product_id == prod.id)
contents = [c for c in contents if c.info.get(Info.KIND, Kind.IMAGE) == kind]
return None if 0 == len(contents) else contents[0]
def _product_native_content(
self, session, prod: Optional[Product] = None, uuid: Optional[UUID] = None, kind: Kind = Kind.IMAGE
) -> Optional[Content]:
# NOTE: This assumes the last Content object is the best resolution,
# but it is untested
if prod is None and uuid is not None:
# Get Product object
try:
prod = session.query(Product).filter(Product.uuid_str == str(uuid)).one()
except NoResultFound:
LOG.error("No product with UUID {} found".format(uuid))
return None
assert prod # nosec B101 # suppress mypy [union-attr]
if kind == Kind.IMAGE:
contents = (
session.query(ContentImage)
.filter(ContentImage.product_id == prod.id)
.order_by(ContentImage.lod.desc())
.all()
)
else:
contents = session.query(Content).filter(Content.product_id == prod.id)
contents = [c for c in contents if c.info.get(Info.KIND, Kind.IMAGE) == kind]
return None if 0 == len(contents) else contents[-1]
#
# combining queries with data content
#
def _overview_content_for_uuid(self, uuid: UUID, kind: Kind = Kind.IMAGE) -> np.memmap:
# FUTURE: do a compound query for this to get the Content entry
# prod = self._product_with_uuid(uuid)
# assert(prod is not None)
with self._inventory as s:
ovc = self._product_overview_content(s, uuid=uuid, kind=kind)
assert ovc is not None # nosec B101
arrays = self._cached_arrays_for_content(ovc)
return arrays.data
#
# workspace file management
#
@property
def _total_workspace_bytes(self):
"""
total number of bytes in the workspace by brute force instead of metadata search
:return:
"""
total = 0
for root, _, files in os.walk(self.cache_dir):
sz = sum(os.path.getsize(os.path.join(root, name)) for name in files)
total += sz
LOG.debug("%d bytes in %s" % (sz, root))
return total
def _all_product_uuids(self) -> list:
with self._inventory as s:
return [q.uuid for q in s.query(Product).all()]
# ----------------------------------------------------------------------
[docs]
def get_info(self, info_or_uuid, lod=None) -> Optional[frozendict]:
"""
:param info_or_uuid: existing datasetinfo dictionary, or its UUID
:param lod: desired level of detail to focus
:return: metadata access with mapping semantics, to be treated as read-only
"""
from collections import ChainMap
# FUTURE deprecate this
if isinstance(info_or_uuid, str):
uuid = UUID(info_or_uuid)
elif not isinstance(info_or_uuid, UUID):
uuid = info_or_uuid[Info.UUID]
else:
uuid = info_or_uuid
with self._inventory as s:
# look up the product for that uuid
prod = self._product_with_uuid(s, uuid)
if not prod: # then it hasn't had its metadata scraped
LOG.error("no info available for UUID {}".format(info_or_uuid))
LOG.error("known products: {}".format(repr(self._all_product_uuids())))
return None
kind = prod.info[Info.KIND]
native_content = self._product_native_content(s, prod=prod, kind=kind)
if native_content is not None:
# FUTURE: this is especially saddening; upgrade to finer grained query and/or deprecate .get_info
# once upon a time...
# our old model was that product == content and shares a UUID with the dataset
# if content is available, we want to provide native content metadata along with the product metadata
# specifically a lot of client code assumes that resource == product == content and
# that singular navigation (e.g. cell_size) is norm
# FIXME DEBUG <- since commit 3576ff0122bd534f83422ce19479d40b7dc9e5b0
assert ( # nosec B101
kind in [Kind.LINES, Kind.POINTS] or native_content.info[Info.CELL_WIDTH] is not None
)
return frozendict(ChainMap(native_content.info, prod.info))
# mapping semantics for database fields, as well as key-value fields;
# flatten to one namespace and read-only
return frozendict(prod.info)
@property
def product_names_available_in_cache(self) -> dict:
"""
Returns: dictionary of {UUID: product name,...}
typically used for add-from-cache dialog
"""
# find non-overview non-auxiliary data files
# FIXME: also need to include coverage and sparsity paths?? really?
zult = {}
with self._inventory as s:
for c in s.query(Content).order_by(Content.atime.desc()).all():
p = c.product
if p.uuid not in zult:
zult[p.uuid] = p.info[Info.DISPLAY_NAME]
return zult
[docs]
def recently_used_products(self, n=32) -> Dict[UUID, str]:
with self._inventory as s:
return OrderedDict(
(p.uuid, p.info[Info.DISPLAY_NAME])
for p in s.query(Product).order_by(Product.atime.desc()).limit(n).all()
)
def _purge_content_for_resource(self, resource: Resource, session, defer_commit=False):
"""
remove all resource contents from the database
if the resource original path no longer exists, also purge resource and products from database
:param resource: resource object we
:return: number of bytes freed from the workspace
"""
if session is not None:
defer_commit = True
S = session # or self._S
total = 0
for prod in resource.product:
for con in prod.content:
total += self._remove_content_files_from_workspace(con)
S.delete(con)
if not resource.exists(): # then purge the resource and its products as well
S.delete(resource)
if not defer_commit:
S.commit()
return total
[docs]
def purge_content_for_product_uuids(self, uuids: list, also_products=False):
"""
given one or more product uuids, purge the Content from the cache
Note: this does not purge any ActiveContent that may still be using the files, but the files will be gone
Args:
uuids:
Returns:
"""
total = 0
for uuid in uuids:
with self._inventory as s:
prod = s.query(Product).filter_by(uuid_str=str(uuid)).first()
conterminate = list(prod.content)
for con in conterminate:
if con.id in self._available:
LOG.warning("will not purge active content!")
continue
total += self._remove_content_files_from_workspace(con)
prod.content.remove(con)
s.delete(con)
if also_products:
s.delete(prod)
return total
def _clean_cache(self):
"""
find stale content in the cache and get rid of it
this routine should eventually be compatible with backgrounding on a thread
possibly include a workspace setting for max workspace size in bytes?
:return:
"""
# get information on current cache contents
with self._inventory as S:
LOG.info("cleaning cache")
total_size = self._total_workspace_bytes
GB = 1024**3
LOG.info("total cache size is {}GB of max {}GB".format(total_size / GB, self._max_size_gb))
max_size = self._max_size_gb * GB
for res in S.query(Resource).order_by(Resource.atime).all():
if total_size < max_size:
break
total_size -= self._purge_content_for_resource(res, session=S)
# remove all content for lowest atimes until
[docs]
def close(self):
self._clean_cache()
[docs]
def bgnd_task_complete(self):
"""
handle operations that should be done at the end of a threaded background task
"""
pass
[docs]
def import_product_content(
self,
uuid: UUID,
prod: Optional[Product] = None,
allow_cache=True,
merge_target_uuid: Optional[UUID] = None,
**importer_kwargs,
) -> np.memmap:
with self._inventory as S:
if prod is None and uuid is not None:
prod = self._product_with_uuid(S, uuid)
self.set_product_state_flag(prod.uuid, State.ARRIVING)
default_prod_kind = prod.info[Info.KIND]
if len(prod.content):
LOG.info("product already has content available, using that rather than re-importing")
ovc = self._product_overview_content(S, uuid=uuid, kind=default_prod_kind)
assert ovc is not None # nosec B101
arrays = self._cached_arrays_for_content(ovc)
return arrays.data
truck = aImporter.from_product(prod, workspace_cwd=self.cache_dir, database_session=S, **importer_kwargs)
if not truck:
# aImporter.from_product() didn't return an Importer instance
# since all files represent data granules, which are already
# loaded and merged into existing datasets.
# Thus: nothing to do.
return None
metadata = prod.info
name = metadata[Info.SHORT_NAME]
# FIXME: for now, just iterate the incremental load.
# later we want to add this to TheQueue and update the UI as we get more data loaded
gen = truck.begin_import_products(prod.id)
nupd = 0
for update in gen:
nupd += 1
# we're now incrementally reading the input file
# data updates are coming back to us (eventually asynchronously)
# Content is in the metadatabase and being updated + committed, including sparsity and coverage arrays
if update.data is not None:
LOG.info("{} {}: {:.01f}%".format(name, update.stage_desc, update.completion * 100.0))
LOG.debug("received {} updates during import".format(nupd))
uuid = prod.uuid
self._clear_product_state_flag(prod.uuid, State.ARRIVING)
# make an ActiveContent object from the Content, now that we've imported it
ac = self._overview_content_for_uuid(uuid, kind=default_prod_kind)
if ac is None:
return None
return ac.data
def _create_product_from_array(
self, info: Mapping, data, namespace=None, codeblock=None
) -> Tuple[UUID, Optional[frozendict], np.memmap]:
"""
update metadatabase to include Product and Content entries for this new dataset we've calculated
this allows the calculated data to reside in the workspace
then return the "official" versions consistent with workspace product/content database
Args:
info: mapping of key-value metadata for new product
data: ndarray with content to store, typically 2D float32
namespace: {variable: uuid, } for calculation of this data
codeblock: text, code to run to recalculate this data within namespace
Returns:
uuid, info, data: uuid of the new product, its official read-only metadata, and cached content ndarray
"""
if Info.UUID not in info:
raise ValueError("currently require an Info.UUID be included in product")
parms = dict(info)
now = datetime.utcnow()
parms.update(
dict(
atime=now,
mtime=now,
)
)
P = Product.from_info(parms, symbols=namespace, codeblock=codeblock)
uuid = P.uuid
# FUTURE: add expression and namespace information, which would require additional parameters
ws_filename = "{}.image".format(str(uuid))
ws_path = os.path.join(self.cache_dir, ws_filename)
with open(ws_path, "wb+") as fp:
mm = np.memmap(fp, dtype=data.dtype, shape=data.shape, mode="w+")
mm[:] = data[:]
parms.update(
dict(
lod=ContentImage.LOD_OVERVIEW,
path=ws_filename,
dtype=str(data.dtype),
proj4=info[Info.PROJ],
resolution=min(info[Info.CELL_WIDTH], info[Info.CELL_HEIGHT]),
)
)
rcls = dict(zip(("rows", "cols", "levels"), data.shape))
parms.update(rcls)
LOG.debug("about to create Content with this: {}".format(repr(parms)))
C = ContentImage.from_info(parms, only_fields=True)
P.content.append(C)
# FUTURE: do we identify a Resource to go with this? Probably not
# transaction with the metadatabase to add the product and content
with self._inventory as S:
S.add(P)
S.add(C)
# FIXME: Do I have to flush the session so the Product gets added for sure?
# activate the content we just loaded into the workspace
overview_data = self._overview_content_for_uuid(uuid)
return uuid, self.get_info(uuid), overview_data
def _bgnd_remove(self, uuid: UUID):
from uwsift.queue import TASK_DOING, TASK_PROGRESS
yield {TASK_DOING: "purging memory", TASK_PROGRESS: 0.5}
LOG.debug(f"Active Content before deletion: {list(self._available.keys())}")
with self._inventory as s:
self._deactivate_content_for_product(self._product_with_uuid(s, uuid))
LOG.debug(f"Active Content after deletion: {list(self._available.keys())}")
yield {TASK_DOING: "purging memory", TASK_PROGRESS: 1.0}
[docs]
def get_content(self, info_or_uuid, lod=None, kind: Kind = Kind.IMAGE) -> Optional[np.memmap]:
"""
By default, get the best-available (closest to native) np.ndarray-compatible view of the full dataset
:param info_or_uuid: existing datasetinfo dictionary, or its UUID
:param lod: desired level of detail to focus (0 for overview)
:return:
"""
if info_or_uuid is None:
return None
elif isinstance(info_or_uuid, UUID):
uuid = info_or_uuid
elif isinstance(info_or_uuid, str):
uuid = UUID(info_or_uuid)
else:
uuid = info_or_uuid[Info.UUID]
# TODO: this causes a locking exception when run in a secondary thread.
# Keeping background operations lightweight makes sense however, so just review this
with self._inventory as s:
if kind == Kind.IMAGE:
content = (
s.query(ContentImage)
.filter((Product.uuid_str == str(uuid)) & (ContentImage.product_id == Product.id))
.order_by(ContentImage.lod.desc())
.all()
)
else:
content = s.query(Content).filter((Product.uuid_str == str(uuid)) & (Content.product_id == Product.id))
content = [x for x in content if x.info.get(Info.KIND, Kind.IMAGE) == kind]
if len(content) != 1:
LOG.warning("More than one matching Content object for '{}'".format(info_or_uuid))
if not len(content) or not content[0]:
raise AssertionError("no content in workspace for {}, must re-import".format(uuid))
content = content[0]
# FIXME: find the content for the requested LOD, then return its
# ActiveContent - or attach one
# for now, just work with assumption of one product one content
active_content = self._cached_arrays_for_content(content)
return active_content.data
def _deactivate_content_for_product(self, p: Optional[Product]):
if p is None:
return
for c in p.content:
self._available.pop(c.id, None)
def _get_active_content_by_uuid(self, uuid: UUID) -> Optional[ActiveContent]:
with self._inventory as s:
prod = self._product_with_uuid(s, uuid)
if prod is None:
return None
content = s.query(Content).filter(Content.product_id == prod.id).one()
return self._available.get(content.id)