#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
PURPOSE
REFERENCES
REQUIRES
:author: R.K.Garcia <rkgarcia@wisc.edu>
:copyright: 2017 by University of Wisconsin Regents, see AUTHORS for more details
:license: GPLv3, see LICENSE for more details
"""
import gc
import logging
import os
from abc import ABC, abstractmethod
from collections import namedtuple
from datetime import datetime, timedelta
from typing import (
Callable,
Dict,
Generator,
Iterable,
List,
Mapping,
Optional,
Set,
Tuple,
Union,
)
import dask.array as da
import numpy as np
import satpy.readers.yaml_reader
import satpy.resample
import yaml
from pyresample.geometry import AreaDefinition, StackedAreaDefinition, SwathDefinition
from satpy import DataQuery, Scene, available_readers
from satpy.dataset import DatasetDict
from satpy.writers import get_enhanced_image
from xarray import DataArray
from uwsift import USE_INVENTORY_DB, config
from uwsift.common import INSTRUMENT_MAP, PLATFORM_MAP, Info, Instrument, Kind, Platform
from uwsift.model.area_definitions_manager import AreaDefinitionsManager
from uwsift.satpy_compat import DataID, get_id_items, get_id_value, id_from_attrs
from uwsift.util import USER_CACHE_DIR
from uwsift.util.common import get_reader_kwargs_dict
from uwsift.workspace.guidebook import ABI_AHI_Guidebook
from .metadatabase import (
Content,
ContentImage,
ContentMultiChannelImage,
ContentUnstructuredPoints,
Product,
Resource,
)
from .utils import metadata_utils
_SATPY_READERS = None # cache: see `available_satpy_readers()` below
SATPY_READER_CACHE_FILE = os.path.join(USER_CACHE_DIR, "available_satpy_readers.yaml")
LOG = logging.getLogger(__name__)
satpy_version = None
DEFAULT_GTIFF_OBS_DURATION = timedelta(seconds=60)
DEFAULT_GUIDEBOOK = ABI_AHI_Guidebook
GUIDEBOOKS = {
Platform.GOES_16: ABI_AHI_Guidebook,
Platform.GOES_17: ABI_AHI_Guidebook,
Platform.GOES_18: ABI_AHI_Guidebook,
Platform.GOES_19: ABI_AHI_Guidebook,
Platform.HIMAWARI_8: ABI_AHI_Guidebook,
Platform.HIMAWARI_9: ABI_AHI_Guidebook,
}
import_progress = namedtuple(
"import_progress",
["uuid", "stages", "current_stage", "completion", "stage_desc", "dataset_info", "data", "content"],
)
"""
# stages:int, number of stages this import requires
# current_stage:int, 0..stages-1 , which stage we're on
# completion:float, 0..1 how far we are along on this stage
# stage_desc:tuple(str), brief description of each of the stages we'll be doing
"""
try:
import hdf5plugin # noqa F401 # suppress "'hdf5plugin' imported but unused"
except ImportError:
LOG.warning(
"Your installation/environment doesn't include the package hdf5plugin. If you want to visualise "
'compressed FCI L1c data, please add this package using "pip install hdf5plugin" or '
'"conda install -c conda-forge hdf5plugin "..'
)
def _load_satpy_readers_cache(force_refresh=None):
"""Get Satpy reader information from a cache file or Satpy itself."""
if force_refresh is None:
force_refresh = os.getenv("UWSIFT_SATPY_CACHE_REFRESH", "False").lower()
force_refresh = force_refresh in [True, "true"]
try:
if force_refresh:
raise RuntimeError("Forcing refresh of available Satpy readers list")
if not satpy_version:
raise RuntimeError("Satpy version cannot be determined, regenerating available readers...")
with open(SATPY_READER_CACHE_FILE, "r") as cfile:
LOG.info("Loading cached available Satpy readers from {}".format(SATPY_READER_CACHE_FILE))
cache_contents = yaml.load(cfile, yaml.SafeLoader)
if cache_contents is None:
raise RuntimeError("Cached reader list is empty, regenerating...")
if cache_contents["satpy_version"] != satpy_version:
raise RuntimeError("Satpy has different version, regenerating available readers...")
except (FileNotFoundError, RuntimeError, KeyError) as cause:
LOG.info("Updating list of available Satpy readers...")
cause.__suppress_context__ = True
readers = available_readers(as_dict=True)
# sort list of readers just in case we depend on this in the future
readers = sorted(readers, key=lambda x: x["name"])
readers = list(_sanitize_reader_info_for_yaml(readers))
cache_contents = {
"satpy_version": satpy_version,
"readers": readers,
}
_save_satpy_readers_cache(cache_contents)
return cache_contents["readers"]
def _sanitize_reader_info_for_yaml(readers):
# filter out known python objects to simplify YAML serialization
for reader_info in readers:
reader_info.pop("reader")
reader_info.pop("data_identification_keys", None)
reader_info["config_files"] = list(reader_info["config_files"])
yield reader_info
def _save_satpy_readers_cache(cache_contents):
"""Write reader cache information to a file on disk."""
cfile_dir = os.path.dirname(SATPY_READER_CACHE_FILE)
os.makedirs(cfile_dir, exist_ok=True)
with open(SATPY_READER_CACHE_FILE, "w") as cfile:
LOG.info("Caching available Satpy readers to {}".format(SATPY_READER_CACHE_FILE))
yaml.dump(cache_contents, cfile)
[docs]
def available_satpy_readers(as_dict=False, force_cache_refresh=None):
"""Get a list of reader names or reader information."""
global _SATPY_READERS
if _SATPY_READERS is None or force_cache_refresh:
_SATPY_READERS = _load_satpy_readers_cache(force_refresh=force_cache_refresh)
if not as_dict:
return [r["name"] for r in _SATPY_READERS]
return _SATPY_READERS
[docs]
def filter_dataset_ids(ids_to_filter: Iterable[DataID]) -> Generator[DataID, None, None]:
"""Generate only non-filtered DataIDs based on EXCLUDE_DATASETS global filters."""
# skip certain DataIDs
for ds_id in ids_to_filter:
for filter_key, filtered_values in config.get("open_file_wizard.exclude_datasets").items():
if filtered_values and get_id_value(ds_id, filter_key) in filtered_values:
break
else:
yield ds_id
[docs]
def get_guidebook_class(dataset_info) -> ABI_AHI_Guidebook:
platform = dataset_info.get(Info.PLATFORM)
return GUIDEBOOKS.get(platform, DEFAULT_GUIDEBOOK)()
def _get_types_of_required_aux_files(scn: Scene) -> Set[str]:
"""
Get the file types of required auxiliary files in the given Scene.
For example, the types 'HRIT_PRO', 'HRIT_EPI' for the prolog/epilog files of
SEVIRI in HRIT format.
:param scn: Scene object to analyse
:return: set of required file types
"""
required_aux_file_types = set()
for reader in scn._readers.values():
for file_handlers in reader.file_handlers.values():
for file_handler in file_handlers:
required_aux_file_types |= set(file_handler.filetype_info.get("requires", ()))
return required_aux_file_types
def _get_paths_of_required_aux_files(scn: Scene, required_aux_file_types) -> Set[str]:
"""
Get set of paths of auxiliary files required to load data in Scene.
For example, prolog/epilog files for SEVIRI in HRIT format.
:param scn: Scene object to analyse
:param required_aux_file_types: the file types which are required
:return: set of required files
"""
required_aux_paths = set()
if required_aux_file_types:
for reader in scn._readers.values():
for file_handlers in reader.file_handlers.values():
for file_handler in file_handlers:
if file_handler.filetype_info["file_type"] in required_aux_file_types:
required_aux_paths.add(file_handler.filename)
return required_aux_paths
def _get_paths_in_scene_contributing_to_ds(scn: Scene, ds_name: str) -> List[str]:
"""
Get list of paths in scene which actually contain data for the dataset given
by its name `ds_name`.
:param scn: scene object to analyse
:param ds_name: dataset name
:return: paths in scene which are required to load given dataset
"""
contributing_paths = []
for reader in scn._readers.values():
for file_handlers in reader.file_handlers.values():
for file_handler in file_handlers:
if not ds_name or not hasattr(file_handler, "channel_name") or file_handler.channel_name == ds_name:
contributing_paths.append(file_handler.filename)
return contributing_paths
[docs]
class aImporter(ABC):
"""
Abstract Importer class creates or amends Resource, Product, Content entries in the metadatabase used by Workspace
aImporter instances are backgrounded by the Workspace to bring Content into the workspace
"""
def __init__(self, workspace_cwd, database_session, **kwargs):
super(aImporter, self).__init__()
# dedicated sqlalchemy database session to use during this import instance;
# revert if necessary, commit as appropriate
self._S = database_session
# where content flat files should be imported to within the workspace, omit this from content path
self._cwd = workspace_cwd
[docs]
@classmethod
def from_product(cls, prod: Product, workspace_cwd, database_session, **kwargs):
# FIXME: deal with products that need more than one resource
try:
cls = prod.resource[0].format
except IndexError:
LOG.error("no resources in {} {}".format(repr(type(prod)), repr(prod)))
raise
paths = [r.path for r in prod.resource]
# Get the Satpy Scene for this set of paths
scn = kwargs["scenes"].get(tuple(paths), None)
# Only Satpy readers / SatpyImporter still supported, thus:
# There must be such a Scene and the prod must be from Satpy, we can
assert scn and "_satpy_id" in prod.info # nosec B101
merge_target = kwargs.get("merge_target")
if merge_target:
# For the merging process it is crucial that it only has
# to deal with files which belong to the given Product `prod`,
# extraneous files would interfere with the merging process.
# Thus: Filter the files in `paths` and keep only those
# that contribute to the Satpy dataset for `prod`.
if "prerequisites" in prod.info:
paths = cls._collect_prerequisites_paths(prod, scn)
else:
paths = _get_paths_in_scene_contributing_to_ds(scn, prod.info["_satpy_id"].get("name"))
# Now we can analyse, which files must be loaded
paths = cls._extract_paths_to_merge(merge_target, paths, scn)
if not paths:
return None
# In order to "load" converted datasets, we have to reuse the existing
# Scene 'scn' from the first instantiation of SatpyImporter instead of
# loading the data again, of course: we need to 'rescue across' the
# converted data from the first SatpyImporter instantiation to the
# second one (which will be created in the very last statement of this
# method).
# Since only products of the kinds POINTS, LINES and VECTORS need
# conversion (at this time), the mechanism is only applied for these.
# For an unknown reason reusing the scene breaks for FCI data,
#
# this Importer data reading magic is too confused.
if prod.info[Info.KIND] in (Kind.POINTS, Kind.LINES, Kind.VECTORS):
del kwargs["scenes"]
kwargs["scene"] = scn
# TODO: ignore mypy error for now because in the future aImporter should be merged with SatpyImporter
return cls(paths, workspace_cwd=workspace_cwd, database_session=database_session, **kwargs) # type: ignore
@classmethod
def _extract_paths_to_merge(cls, merge_target, paths, scn):
"""
Get a subset of 'paths', filtering out all files that are already loaded
into the target product.
I.e., collect only files from the given 'paths' list that are not yet
loaded and, if necessary, auxiliary files required to load these.
"""
new_paths = []
existing_content = merge_target.content[0]
for path in paths:
if path not in existing_content.source_files:
new_paths.append(path)
if not new_paths:
return None
if new_paths != paths:
required_aux_file_types = _get_types_of_required_aux_files(scn)
required_aux_file_paths = _get_paths_of_required_aux_files(scn, required_aux_file_types)
if set(new_paths) == required_aux_file_paths:
return None
return new_paths + list(required_aux_file_paths) # TODO
return new_paths
@classmethod
def _collect_prerequisites_paths(cls, prod, scn):
# If the dataset has prerequisites - this is the case for
# RGB composites provided by Satpy - the files that are
# required by these must be collected.
paths = []
for prerequisite in prod.info.get("prerequisites"):
ds_name = prerequisite.get("name") if isinstance(prerequisite, DataQuery) else prerequisite
if ds_name not in scn.available_dataset_names():
# This is a not supported case: Merging should be
# done for an RGB composite of which prequisite
# datasets are missing in the scene. This can
# happen, when the composite depends on other
# composites - their datasets have not been created
# in the Scene at this stage, thus the Scene can't
# be queried for the files required by these datasets.
# We have to interrupt this loading process (which
# runs in its own thread) by raising an exception.
raise RuntimeError(
f"The Satpy RGB Composite type '{prod.info[Info.SHORT_NAME]}'"
f" does not work when merging of new data chunks"
f" into existing data is active."
f" Consider switching it off by configuring"
f" 'data_reading.merge_with_existing: False'"
)
paths += _get_paths_in_scene_contributing_to_ds(scn, ds_name)
# remove possible duplicates in paths
paths = list(dict.fromkeys(paths))
return paths
[docs]
@classmethod
@abstractmethod
def is_relevant(cls, source_path=None, source_uri=None) -> bool:
"""
return True if this importer is capable of reading this URI.
"""
return False
[docs]
@abstractmethod
def merge_resources(self) -> Iterable[Resource]:
"""
Returns:
sequence of Resources found at the source, typically one resource per file
"""
return []
[docs]
@abstractmethod
def merge_products(self) -> Iterable[Product]:
"""
products available in the resource, adding any metadata entries for Products within the resource
this may be run by the metadata collection agent, or by the workspace!
Returns:
sequence of Products that could be turned into Content in the workspace
"""
return []
[docs]
@abstractmethod
def begin_import_products(self, *product_ids) -> Generator[import_progress, None, None]:
"""
background import of content from a series of products
if none are provided, all products resulting from merge_products should be imported
Args:
*products: sequence of products to import
Returns:
generator which yields status tuples as the content is imported
"""
# FUTURE: this should be async def coroutine
pass
[docs]
@abstractmethod
def release_resources(self):
"""Release the resources associated to this importer."""
[docs]
def determine_dynamic_dataset_kind(attrs: dict, reader_name: str) -> str:
"""Determine kind of dataset dynamically based on dataset attributes.
This currently supports only the distinction between IMAGE and POINTS kinds.
It makes the assumption that if the dataset has a SwathDefinition, and is 1-D, it represents points.
"""
if isinstance(attrs["area"], SwathDefinition) and len(attrs["area"].shape) == 1:
data_kind = "POINTS"
else:
data_kind = "IMAGE"
LOG.info(f"Selected dynamically kind {data_kind} for dataset {attrs['name']} from reader {reader_name}")
return data_kind
[docs]
class SatpyImporter(aImporter):
"""Generic SatPy importer"""
def __init__(self, source_paths, workspace_cwd, database_session, **kwargs):
super(SatpyImporter, self).__init__(workspace_cwd, database_session)
reader = kwargs.pop("reader", None)
if reader is None:
raise NotImplementedError("Can't automatically determine reader.")
if not isinstance(source_paths, (list, tuple)):
source_paths = [source_paths]
if Scene is None:
raise ImportError("SatPy is not available and can't be used as " "an importer")
self.filenames = list(source_paths)
self.reader = reader
self.resampling_info = kwargs.get("resampling_info")
self.scn = kwargs.get("scene")
self.merge_target = kwargs.get("merge_target")
if self.scn is None:
reader_kwargs = get_reader_kwargs_dict([self.reader])
self.scn = Scene(filenames={self.reader: self.filenames}, reader_kwargs=reader_kwargs)
self._resources = []
# DataID filters
self.product_filters = {}
for k in ["resolution", "calibration", "level"]:
if k in kwargs:
self.product_filters[k] = kwargs.pop(k)
# NOTE: product_filters don't do anything if the dataset_ids aren't
# specified since we are using all available dataset ids
self.dataset_ids = kwargs.get("dataset_ids")
if self.dataset_ids is None:
self.dataset_ids = filter_dataset_ids(self.scn.available_dataset_ids())
self.dataset_ids = sorted(self.dataset_ids)
self.use_inventory_db = USE_INVENTORY_DB
self.required_aux_file_types = _get_types_of_required_aux_files(self.scn)
# NOTE: needed for an issue with resampling of NetCDF data.
self.scn_original = None # noqa - Do not simply remove
[docs]
@classmethod
def from_product(cls, prod: Product, workspace_cwd, database_session, **kwargs):
# this overrides the base class because we assume that the product
# has kwargs that we want
# NOTE: The kwargs are currently provided by the caller in
# workspace.py so this isn't needed right now
# FIXME: deal with products that need more than one resource
try:
cls = prod.resource[0].format
except IndexError:
LOG.error("no resources in {} {}".format(repr(type(prod)), repr(prod)))
raise
kwargs.pop("reader", None)
kwargs.pop("scenes", None)
kwargs.pop("scene", None)
kwargs["dataset_ids"] = [prod.info["_satpy_id"]]
filenames = [r.path for r in prod.resource]
return cls(filenames, workspace_cwd=workspace_cwd, database_session=database_session, **kwargs)
[docs]
@classmethod
def is_relevant(cls, source_path=None, source_uri=None):
# this importer should only be used if specifically requested
return False
[docs]
def release_resources(self):
"""Release the satpy scene associated to this importer."""
if self.scn_original is not None:
self.scn_original.unload()
self.scn_original = None
if self.scn is not None:
self.scn.unload()
self.scn = None
gc.collect()
[docs]
def merge_resources(self):
if len(self._resources) == len(self.filenames):
return self._resources
if self.use_inventory_db:
resources = self._S.query(Resource).filter(Resource.path.in_(self.filenames)).all()
if len(resources) == len(self.filenames):
self._resources = resources
return self._resources
now = datetime.utcnow()
res_dict = {r.path: r for r in self._resources}
for fn in self.filenames:
if fn in res_dict:
continue
res = Resource(
format=type(self),
path=fn,
mtime=now,
atime=now,
)
if self.use_inventory_db:
self._S.add(res)
res_dict[fn] = res
self._resources = res_dict.values()
return self._resources
[docs]
def merge_products(self) -> Iterable[Product]:
resources = self.merge_resources()
if resources is None:
LOG.debug("no resources for {}".format(self.filenames))
return
existing_ids = {}
if self.use_inventory_db:
resources = list(resources)
for res in resources:
products = list(res.product)
existing_ids.update({prod.info["_satpy_id"]: prod for prod in products})
existing_prods = {x: existing_ids[x] for x in self.dataset_ids if x in existing_ids}
# FIXME: products may be unset in next line, its use is suspicious anyway
if products and len(existing_prods) == len(self.dataset_ids):
products = list(existing_prods.values())
LOG.debug("pre-existing products {}".format(repr(products)))
yield from products
return
from uuid import uuid1
self._load_all_datasets()
revised_datasets = self._revise_all_datasets()
for ds_id, ds in revised_datasets.items():
# don't recreate a Product for one we already have
if ds_id in existing_ids:
yield existing_ids[ds_id]
continue
meta = ds.attrs
uuid = uuid1()
meta[Info.UUID] = uuid
meta["_satpy_id"] = ds_id
now = datetime.utcnow()
prod = Product(
uuid_str=str(uuid),
atime=now,
)
prod.resource.extend(resources)
assert Info.OBS_TIME in meta # nosec B101
assert Info.OBS_DURATION in meta # nosec B101
prod.update(meta) # sets fields like obs_duration and obs_time transparently
assert prod.info[Info.OBS_TIME] is not None and prod.obs_time is not None # nosec B101
LOG.debug("new product: {}".format(repr(prod)))
if self.use_inventory_db:
self._S.add(prod)
self._S.commit()
yield prod
def _extract_segment_number(self):
"""
Load the segment numbers loaded by this scene.
:return: list of all the segment numbers of all segments loaded by the scene
"""
segments = []
collected_segment_count = 0
for reader in self.scn._readers.values():
for file_handlers in reader.file_handlers.values():
collected_segment_count += 1
for file_handler in file_handlers:
if file_handler.filetype_info["file_type"] in self.required_aux_file_types:
collected_segment_count -= 1
continue
seg = file_handler.filename_info["segment"]
segments.append(seg)
filtered_segments = []
for segment in set(segments):
if segments.count(segment) == collected_segment_count:
filtered_segments.append(segment)
return sorted(filtered_segments)
def _extract_expected_segments(self) -> Optional[int]:
"""
Load the number of expected segments for the products loaded by the given scene.
:return: number of expected segments or None if not found or products in scene
have different numbers of expected segments
"""
expected_segments = None
for reader in self.scn._readers.values():
for file_handlers in reader.file_handlers.values():
for file_handler in file_handlers:
if file_handler.filetype_info["file_type"] in self.required_aux_file_types:
continue
es = file_handler.filetype_info["expected_segments"]
if expected_segments is None:
expected_segments = es
if expected_segments != es:
return None
return expected_segments
@property
def num_products(self) -> int:
# WARNING: This could provide radiances and higher level products
# which SIFT probably shouldn't care about
return len(self.dataset_ids)
@staticmethod
def _get_platform_instrument(attrs: dict):
"""Convert SatPy platform_name/sensor to"""
attrs[Info.INSTRUMENT] = attrs.get("sensor")
attrs[Info.PLATFORM] = attrs.get("platform_name") or attrs.get("platform_shortname")
# Special handling of GRIB forecast data
if "centreDescription" in attrs and attrs[Info.INSTRUMENT] == "unknown":
description = attrs["centreDescription"]
if attrs.get(Info.PLATFORM) is None:
attrs[Info.PLATFORM] = "NWP"
if "NCEP" in description:
attrs[Info.INSTRUMENT] = "GFS"
if attrs[Info.INSTRUMENT] in ["GFS", "unknown"]:
attrs[Info.INSTRUMENT] = Instrument.GFS
if attrs[Info.PLATFORM] in ["NWP", "unknown"]:
attrs[Info.PLATFORM] = Platform.NWP
# FUTURE: Use standard string names for platform/instrument
# instead of an Enum. Otherwise, could use a reverse
# Enum lookup to match Enum values to Enum keys.
# if we haven't figured out what these are then give up and say they are unknown
if isinstance(attrs[Info.PLATFORM], str):
plat_str = attrs[Info.PLATFORM].lower().replace("-", "")
attrs[Info.PLATFORM] = PLATFORM_MAP.get(plat_str, attrs[Info.PLATFORM])
if not attrs[Info.PLATFORM] or isinstance(attrs[Info.PLATFORM], str):
attrs[Info.PLATFORM] = Platform.UNKNOWN
if isinstance(attrs[Info.INSTRUMENT], str):
inst_str = attrs[Info.INSTRUMENT].lower().replace("-", "")
attrs[Info.INSTRUMENT] = INSTRUMENT_MAP.get(inst_str, attrs[Info.INSTRUMENT])
if not attrs[Info.INSTRUMENT] or isinstance(attrs[Info.INSTRUMENT], str):
attrs[Info.INSTRUMENT] = Instrument.UNKNOWN
def _load_all_datasets(self) -> None:
self.scn.load(self.dataset_ids, pad_data=False, upper_right_corner="NE", **self.product_filters)
# copy satpy metadata keys to SIFT keys
if self.scn.missing_datasets:
# deactivating reduce_data, see https://github.com/pytroll/satpy/issues/2476
self.scn = self.scn.resample(resampler="native", reduce_data=False)
for ds in self.scn:
self._set_name_metadata(ds.attrs)
self._set_time_metadata(ds.attrs)
self._set_kind_metadata(ds.attrs)
self._set_wavelength_metadata(ds.attrs)
self._set_shape_metadata(ds.attrs, ds.shape)
self._set_scene_metadata(ds.attrs)
self._set_family_metadata(ds.attrs)
self._set_category_metadata(ds.attrs)
self._set_serial_metadata(ds.attrs)
ds.attrs.setdefault("reader", self.reader)
@staticmethod
def _set_name_metadata(
attrs: dict, name: Optional[str] = None, short_name: Optional[str] = None, long_name: Optional[str] = None
) -> None:
if not name:
name = attrs["name"] or attrs.get(Info.STANDARD_NAME)
id_str = ":".join(str(v[1]) for v in get_id_items(id_from_attrs(attrs)))
attrs[Info.DATASET_NAME] = id_str
model_time = attrs.get("model_time")
if model_time is not None:
attrs[Info.DATASET_NAME] += " " + model_time.isoformat()
level = attrs.get("level")
if level is None:
attrs[Info.SHORT_NAME] = short_name or name
else:
attrs[Info.SHORT_NAME] = f"{short_name or name} @ {level}hPa"
attrs[Info.LONG_NAME] = long_name or name
attrs[Info.STANDARD_NAME] = attrs.get("standard_name") or name
@staticmethod
def _determine_sched_time(attrs: dict):
if "time_parameters" in attrs:
return attrs["time_parameters"]["nominal_start_time"]
if "nominal_start_time" in attrs:
return attrs["nominal_start_time"]
return attrs["start_time"]
@staticmethod
def _set_time_metadata(attrs: dict) -> None:
# TODO Review Satpy start/end times ("[{nominal,observation]_}{start,end}_times") and adjust SIFT
# terminology regarding start/end times and durations accordingly.
start_time = attrs["start_time"]
attrs[Info.OBS_TIME] = start_time
attrs[Info.SCHED_TIME] = SatpyImporter._determine_sched_time(attrs)
duration = attrs.get("end_time", start_time) - start_time
if duration.total_seconds() <= 0:
duration = timedelta(minutes=60)
attrs[Info.OBS_DURATION] = duration
def _set_kind_metadata(self, attrs: dict) -> None:
if "prerequisites" in attrs:
attrs[Info.KIND] = Kind.MC_IMAGE
return
reader_kind = config.get(f"data_reading.{self.reader}.kind", None)
if reader_kind is None:
LOG.info(f"No data kind configured for reader '{self.reader}'. Falling back to 'IMAGE'.")
attrs[Info.KIND] = Kind.IMAGE
else:
set_kind_metadata_from_reader_config(self.reader, reader_kind, attrs)
def _set_wavelength_metadata(self, attrs: dict) -> None:
self._get_platform_instrument(attrs)
if "wavelength" in attrs and attrs.get("wavelength"):
attrs.setdefault(Info.CENTRAL_WAVELENGTH, attrs["wavelength"][1])
def _set_shape_metadata(self, attrs: dict, shape) -> None:
if len(shape) == 3 and "prerequisites" in attrs.keys():
shape = (shape[1], shape[2], shape[0])
attrs[Info.SHAPE] = shape if not self.resampling_info else self.resampling_info["shape"]
attrs[Info.UNITS] = attrs.get("units")
if attrs[Info.UNITS] == "unknown":
LOG.warning("Dataset units are unknown, using '1'")
attrs[Info.UNITS] = 1
generate_guidebook_metadata(attrs)
def _set_scene_metadata(self, attrs: dict) -> None:
attrs[Info.SCENE] = attrs.get("scene_id")
if attrs[Info.SCENE] is None:
self._compute_scene_hash(attrs)
@staticmethod
def _set_family_metadata(attrs: dict) -> None:
if attrs.get(Info.CENTRAL_WAVELENGTH) is None:
cw = ""
else:
cw = ":{:5.2f}µm".format(attrs[Info.CENTRAL_WAVELENGTH])
attrs[Info.FAMILY] = "{}:{}:{}{}".format(
attrs[Info.KIND].name, attrs[Info.STANDARD_NAME], attrs[Info.SHORT_NAME], cw
)
@staticmethod
def _set_category_metadata(attrs: dict) -> None:
# system:platform:instrument:target
attrs[Info.CATEGORY] = "SatPy:{}:{}:{}".format(
attrs[Info.PLATFORM].name, attrs[Info.INSTRUMENT].name, attrs[Info.SCENE]
)
@staticmethod
def _set_serial_metadata(attrs: dict) -> None:
# TODO: Include level or something else in addition to time?
start_str = attrs["start_time"].isoformat()
if "model_time" in attrs:
model_time = attrs["model_time"].isoformat()
attrs[Info.SERIAL] = f"{model_time}:{start_str}"
else:
attrs[Info.SERIAL] = start_str
@staticmethod
def _get_area_extent(area: Union[AreaDefinition, StackedAreaDefinition]) -> List:
"""
Return the area extent of the AreaDefinition or of the nominal
AreaDefinition of the StackedAreaDefinition. The nominal AreaDefinition
is the one referenced via 'area_id' by all AreaDefinitions listed in the
StackedAreaDefinition. If there are different 'area_id's in the
StackedAreaDefinition this function fails throwing an exception.
Use case: Get the extent of the full-disk to which a non-contiguous set
of segments loaded from a segmented GEOS file format (e.g. HRIT) belongs.
:param area: AreaDefinition ar StackedAreaDefinition
:return: Extent of the nominal AreaDefinition.
:raises ValueError: when a StackedAreaDefinition with incompatible
AreaDefinitions is passed
"""
if isinstance(area, StackedAreaDefinition):
any_area_def = area.defs[0]
for area_def in area.defs:
if area_def.area_id != any_area_def.area_id:
raise ValueError(f"Different area_ids found in StackedAreaDefinition: {area}.")
area = satpy.resample.get_area_def(any_area_def.area_id)
return area.area_extent
def _compute_scene_hash(self, attrs: dict):
"""Compute a "good enough" hash and store it as
SCENE information.
The SCENE information is used at other locations to identify data that
has roughly the same extent and projection. That is a pre-requisite
to allow to derive algebraics and compositions from them.
Unstructured data has no clear extent and not an intrinsic projection
(data locations are in latitude / longitude), thus something like a
SCENE (in the sense of describing a view on a section of the earth's
surface) cannot clearly be determined for it.
"""
try:
area = (
attrs["area"]
if not self.resampling_info
else AreaDefinitionsManager.area_def_by_id(self.resampling_info["area_id"])
)
# round extents to nearest 100 meters
extents = tuple(int(np.round(x / 100.0) * 100.0) for x in self._get_area_extent(area))
attrs[Info.SCENE] = "{}-{}".format(str(extents), area.proj_str)
except (KeyError, AttributeError):
# Scattered data, this is not suitable to define a scene
attrs[Info.SCENE] = None
def _stack_data_arrays(
self, datasets: List[DataArray], attrs: dict, name_prefix: str = "", axis: int = 1
) -> DataArray:
"""
Merge multiple DataArrays into a single ``DataArray``. Use the
``attrs`` dict for the DataArray metadata. This method also copies
the Satpy metadata fields into SIFT fields. The ``attrs`` dict won't
be modified.
:param datasets: List of DataArrays
:param attrs: metadata for the resulting DataArray
:param name_prefix: if given, a prefix for the name of the new DataArray
:param axis: numpy axis index
:return: stacked Dask array
"""
# Workaround for a Dask bug: Convert all DataArrays to float32
# before calling into dask, because an int16 DataArray will be
# converted into a Series instead of a dask Array with newer
# versions. This then causes a TypeError.
meta = np.stack([da.utils.meta_from_array(ds) for ds in datasets], axis=axis)
datasets = [ds.astype(meta.dtype) for ds in datasets]
combined_data = da.stack(datasets, axis=axis)
attrs = attrs.copy()
ds_id = attrs["_satpy_id"]
name = f"{name_prefix or ''}{attrs['name']}"
attrs["_satpy_id"] = DataID(ds_id.id_keys, name=name)
self._set_name_metadata(attrs, name)
self._set_time_metadata(attrs)
self._set_kind_metadata(attrs)
self._set_wavelength_metadata(attrs)
self._set_shape_metadata(attrs, combined_data.shape)
guidebook = get_guidebook_class(attrs)
attrs[Info.DISPLAY_NAME] = guidebook._default_display_name(attrs)
self._set_scene_metadata(attrs)
self._set_family_metadata(attrs)
self._set_category_metadata(attrs)
return DataArray(combined_data, attrs=attrs)
def _parse_style_attributes_config(self) -> Dict[str, Set[str]]:
"""
Extract the ``style_attributes`` section from the reader config.
This function doesn't validate whether the style attributes or the
product names exist.
The returned dictionary lists for each product, which dependent styles
it is configured for.
Maintenance: this swaps the mapping direction since the new one is more
useful for querying which features (style attributes) should be styled depending
on a given product
:return: mapping of products to style attributes
"""
style_attributes = config.get(f"data_reading.{self.reader}.style_attributes", None)
if not style_attributes:
return {}
style_attributes_by_product = {}
for style_attribute, product_names in style_attributes.items():
for product_name in product_names:
if product_name not in style_attributes_by_product:
style_attributes_by_product[product_name] = {style_attribute}
else:
style_attributes_by_product[product_name].add(style_attribute)
return style_attributes_by_product
def _combine_points(self, datasets: DatasetDict, converter) -> Dict[DataID, DataArray]:
# Currently only the "fill" can be colored by a mapping from product values to colors (using a colormap) but in
# the future other features could be colored as well, e.g. like "stroke".
supported_color_by_style_attrs = {"fill"}
style_attributes_by_product = self._parse_style_attributes_config()
converted_datasets = {}
for ds_id, ds in datasets.items():
if ds.attrs[Info.KIND] != Kind.POINTS:
continue
ds_name = ds_id["name"]
do_color_by_values = ds_name in style_attributes_by_product and bool(
style_attributes_by_product[ds_name] & supported_color_by_style_attrs
)
try:
# Only if we want to use the values of the data to color
# markers we need the dataset's array itself, otherwise the
# coordinates alone are sufficient.
convertible_ds = (
[ds.area.lons, ds.area.lats, ds] if do_color_by_values else [ds.area.lons, ds.area.lats]
)
except AttributeError:
# Some products (e.g. `latitude` and `longitude`) may not
# (for whatever reason) have an associated SwathDefinition.
# Without it, there is no geolocation information per data
# point, because it is taken from its fields 'lats', 'lons'.
# This cannot be healed, point data loading fails.
LOG.error(
f"Dataset '{ds.attrs['name']}' of kind POINTS has no point"
f" coordinates (lats, lons) (Missing SwathDefinition)."
)
continue
converted_datasets[ds_id] = converter(convertible_ds, ds.attrs)
return converted_datasets
def _parse_coords_end_config(self) -> List[str]:
"""
Parse the ``coordinates_end`` section of the reader config.
:return: List of ``coords`` identifiers
"""
coords_end = config.get(f"data_reading.{self.reader}.coordinates_end", None)
if not coords_end:
return []
if len(coords_end) < 2 or len(coords_end) > 2:
LOG.warning("expected 2 end coordinates for LINES")
return coords_end
def _combine_lines(self, datasets: DatasetDict, converter) -> Dict[DataID, DataArray]:
"""
Find convertible LINES datasets in the ``DatasetDict``. The ``converter``
function is then used to generate new DataArrays.
The positions for the tip and base of the lines are extracted from the
dataset metadata. The dataset itself will be discarded.
:param datasets: all loaded datasets and previously converted datasets
:param converter: function to convert a list of DataArrays
:return: mapping of converted DataID to new DataArray
"""
coords_end = self._parse_coords_end_config()
converted_datasets = {}
for ds_id, ds in datasets.items():
if ds.attrs[Info.KIND] != Kind.LINES:
continue
convertible_ds = []
try:
for coord in coords_end:
convertible_ds.append(ds.coords[coord]) # base
except KeyError:
LOG.error(f"dataset has no coordinates: {ds.attrs['name']}")
continue
if len(convertible_ds) < 2:
LOG.error(f"LINES dataset needs 4 coordinates: {ds.attrs['name']}")
continue
convertible_ds.extend([ds.area.lons, ds.area.lats]) # tip
converted_datasets[ds_id] = converter(convertible_ds, ds.attrs)
return converted_datasets
def _revise_all_datasets(self) -> DatasetDict:
"""
Revise all datasets and convert the data representation of the POINTS
and LINES records found to the data format VisPy needs to display them
as such.
The original datasets are not kept in the scene but replaced by the
converted datasets.
:return: ``DatasetDict`` with the loaded and converted datasets
"""
loaded_datasets = DatasetDict()
for ds_id in self.scn.keys():
ds = self.scn[ds_id]
loaded_datasets[ds_id] = ds
# Note: If you do not want the converted data sets to overwrite the
# original data sets in a future development of this software,
# you can pass a suitable prefix to self._stack_data_arrays() (which
# will be used to create a new name for the converted data set) by
# defining the appropriate converters, for example, as follows:
# self._combine_points: lambda *args: self._stack_data_arrays(*args, "POINTS-"),
converters = {
self._combine_points: lambda *args: self._stack_data_arrays(*args),
self._combine_lines: lambda *args: self._stack_data_arrays(*args),
}
for detector, converter in converters.items():
converted_datasets = detector(loaded_datasets, converter)
for old_ds_id, new_ds in converted_datasets.items():
del loaded_datasets[old_ds_id]
new_ds_id = DataID.from_dataarray(new_ds)
loaded_datasets[new_ds_id] = new_ds
for ds_id, ds in loaded_datasets.items():
self.scn[ds_id] = ds
return loaded_datasets
@staticmethod
def _area_to_sift_attrs(area):
"""Area to uwsift keys"""
if not isinstance(area, AreaDefinition):
raise NotImplementedError(
"Only AreaDefinition datasets can be loaded at this time. "
"If you're trying to read data that needs resampling, like LEO data, remember "
"to select a resampling method in the Open File Wizard, and set "
"'geometry_definition: SwathDefinition' in your reader config."
)
return {
Info.PROJ: area.proj_str,
Info.ORIGIN_X: area.area_extent[0], # == lower_(left_)x
Info.ORIGIN_Y: area.area_extent[3], # == upper_(right_)y
Info.CELL_WIDTH: area.pixel_size_x,
Info.CELL_HEIGHT: -area.pixel_size_y,
}
def _get_grid_info(self):
grid_origin = config.get(f"data_reading.{self.reader}.grid.origin", "NW")
grid_first_index_x = config.get(f"data_reading.{self.reader}.grid.first_index_x", 0)
grid_first_index_y = config.get(f"data_reading.{self.reader}.grid.first_index_y", 0)
return {
Info.GRID_ORIGIN: grid_origin,
Info.GRID_FIRST_INDEX_X: grid_first_index_x,
Info.GRID_FIRST_INDEX_Y: grid_first_index_y,
}
[docs]
def begin_import_products(self, *product_ids) -> Generator[import_progress, None, None]: # noqa: C901
if self.use_inventory_db:
products = self._get_products_from_inventory_db(product_ids)
else:
products = product_ids
merge_with_existing = self.merge_target is not None
# FIXME: Don't recreate the importer every time we want to load data
dataset_ids = [prod.info["_satpy_id"] for prod in products]
self.scn.load(dataset_ids, pad_data=not merge_with_existing, upper_right_corner="NE")
# If there are datasets missing, resampling is needed to create them:
if self.scn.missing_datasets:
# About the next strange line of code: keep a reference to the
# original scene to work around an issue in the resampling
# implementation for NetCDF data: otherwise the original data
# would be garbage collected too early.
self.scn_original = self.scn # noqa - Do not simply remove
# deactivating reduce_data, see https://github.com/pytroll/satpy/issues/2476
self.scn = self.scn.resample(resampler="native", reduce_data=False)
if self.resampling_info:
self._preprocess_products_with_resampling()
num_stages = len(products)
for idx, (prod, ds_id) in enumerate(zip(products, dataset_ids)):
dataset = (
self.scn[ds_id] if prod.info[Info.KIND] != Kind.MC_IMAGE else get_enhanced_image(self.scn[ds_id]).data
)
if prod.info[Info.KIND] == Kind.MC_IMAGE:
# The dimension of the dataset is ('bands', 'y', 'x')
# but for later the dimension ('y', 'x', 'bands') is needed
dataset = dataset.transpose("y", "x", "bands")
# Since in the first SatpyImporter loading pass (see
# load_all_datasets()) no padding is applied (pad_data=False), the
# Info.SHAPE stored at that time may not be the same as it is now if
# we load with padding now. In that case we must update the
# prod.info[Info.SHAPE] with the actual shape, but let's do this in
# any case, it doesn't hurt.
prod.info[Info.SHAPE] = dataset.shape
kind = prod.info[Info.KIND]
if prod.content:
LOG.warning("content was already available, skipping import")
continue
now = datetime.utcnow()
if kind in [Kind.LINES, Kind.POINTS]:
if len(dataset.shape) == 1:
LOG.error(f"one dimensional dataset can't be loaded: {ds_id['name']}")
continue
completion, content, data_memmap = self._create_unstructured_points_dataset_content(dataset, now, prod)
yield import_progress(
uuid=prod.uuid,
stages=num_stages,
current_stage=idx,
completion=completion,
stage_desc=f"SatPy {kind.name} data add to workspace",
dataset_info=None,
data=data_memmap,
content=content,
)
continue
uuid = prod.uuid
if merge_with_existing:
existing_product = self.merge_target
segments = self._extract_segment_number()
uuid = existing_product.uuid
c = existing_product.content[0]
img_data = c.img_data
self.merge_data_into_memmap(dataset.data, img_data, segments)
else:
area_info = self._area_to_sift_attrs(dataset.attrs["area"])
grid_info = self._get_grid_info()
if kind == Kind.MC_IMAGE:
c, img_data = self._create_mc_image_dataset_content(dataset, now, prod, area_info, grid_info)
else:
c, img_data = self._create_image_dataset_content(dataset, now, prod, area_info, grid_info)
c.info[Info.KIND] = prod.info[Info.KIND]
c.img_data = img_data
c.source_files = set() # TODO(AR): adds member to the ContentImage object 'c'
# c.info.update(prod.info) would just make everything leak together so let's not do it
prod.content.append(c)
self._add_content_to_cache(c)
required_files = _get_paths_of_required_aux_files(self.scn, self.required_aux_file_types)
# Note loaded source files but not required files. This is necessary
# because the merger will try to not load already loaded files again
# but might need to reload required files.
c.source_files |= set(self.filenames) - required_files
yield import_progress(
uuid=uuid,
stages=num_stages,
current_stage=idx,
completion=1.0,
stage_desc=f"SatPy {kind.name} data add to workspace",
dataset_info=None,
data=img_data,
content=c,
)
def _get_products_from_inventory_db(self, product_ids):
if product_ids:
products = [self._S.query(Product).filter_by(id=anid).one() for anid in product_ids]
assert products # nosec B101
else:
products = list(
self._S.query(Resource, Product)
.filter(Resource.path.in_(self.filenames))
.filter(Product.resource_id == Resource.id)
.all()
)
assert products # nosec B101
return products
def _create_mc_image_dataset_content(self, dataset, now, prod, area_info, grid_info):
data_filename, img_data = self._create_data_memmap_file(dataset.data, dataset.data.dtype, prod)
shape = prod.info[Info.SHAPE]
c = ContentMultiChannelImage(
lod=0,
resolution=int(min(abs(area_info[Info.CELL_WIDTH]), abs(area_info[Info.CELL_HEIGHT]))),
atime=now,
mtime=now,
# info about the data array memmap
path=data_filename,
rows=shape[0],
cols=shape[1],
bands=shape[2],
proj4=area_info[Info.PROJ],
dtype=str(dataset.dtype),
cell_width=area_info[Info.CELL_WIDTH],
cell_height=area_info[Info.CELL_HEIGHT],
origin_x=area_info[Info.ORIGIN_X],
origin_y=area_info[Info.ORIGIN_Y],
grid_origin=grid_info[Info.GRID_ORIGIN],
grid_first_index_x=grid_info[Info.GRID_FIRST_INDEX_X],
grid_first_index_y=grid_info[Info.GRID_FIRST_INDEX_Y],
)
return c, img_data
def _create_image_dataset_content(self, dataset, now, prod, area_info, grid_info):
# For kind IMAGE the dtype must be float32 seemingly, see class
# Column, comment for 'dtype' and the construction of c = Content
# just below.
# FIXME: It is dubious to enforce that type conversion to happen in
# _create_data_memmap_file, but otherwise IMAGES of pixel counts
# data (dtype = np.uint16) crash.
data_filename, img_data = self._create_data_memmap_file(dataset.data, np.float32, prod)
shape = prod.info[Info.SHAPE]
c = ContentImage(
lod=0,
resolution=int(min(abs(area_info[Info.CELL_WIDTH]), abs(area_info[Info.CELL_HEIGHT]))),
atime=now,
mtime=now,
# info about the data array memmap
path=data_filename,
rows=shape[0],
cols=shape[1],
proj4=area_info[Info.PROJ],
dtype="float32",
cell_width=area_info[Info.CELL_WIDTH],
cell_height=area_info[Info.CELL_HEIGHT],
origin_x=area_info[Info.ORIGIN_X],
origin_y=area_info[Info.ORIGIN_Y],
grid_origin=grid_info[Info.GRID_ORIGIN],
grid_first_index_x=grid_info[Info.GRID_FIRST_INDEX_X],
grid_first_index_y=grid_info[Info.GRID_FIRST_INDEX_Y],
)
return c, img_data
def _create_unstructured_points_dataset_content(self, dataset, now, prod):
data_filename, data_memmap = self._create_data_memmap_file(dataset.data, dataset.dtype, prod)
shape = prod.info[Info.SHAPE]
content = ContentUnstructuredPoints(
atime=now,
mtime=now,
# info about the data array memmap
path=data_filename,
n_points=shape[0],
n_dimensions=shape[1],
dtype=dataset.dtype,
)
content.info[Info.KIND] = prod.info[Info.KIND]
prod.content.append(content)
self._add_content_to_cache(content)
completion = 2.0
return completion, content, data_memmap
def _preprocess_products_with_resampling(self) -> None:
resampler: str = self.resampling_info["resampler"]
max_area = self.scn.finest_area()
if (
isinstance(max_area, AreaDefinition)
and max_area.area_id == self.resampling_info["area_id"]
and self.resampling_info["custom"] is False
):
# resampling is not needed
LOG.info(
f"Source and target area ID are identical:"
f" '{self.resampling_info['area_id']}'."
f" Skipping resampling."
)
else:
# resampling is needed
area_name = max_area.area_id if hasattr(max_area, "area_id") else max_area.name
target_area_def = AreaDefinitionsManager.area_def_by_id(self.resampling_info["area_id"])
if self.resampling_info["custom"] is False:
LOG.info(
f"Resampling from area ID/name '{area_name}'"
f" to area ID '{self.resampling_info['area_id']}'"
f" with method '{resampler}'"
)
else:
# custom resolution
resampler_shape = self.resampling_info["shape"]
resampler_width = resampler_shape[0]
resampler_height = resampler_shape[1]
LOG.info(
f"Resampling area with the characteristics of '{area_name}'"
f" with method '{resampler}'"
f" with CUSTOM SHAPE: '{resampler_width}, {resampler_height}'"
)
AreaDefinitionsManager.prepare_area_def_for_resampling(
target_area_def, resampler_width, resampler_height
)
# About the next strange line of code: keep a reference to the
# original scene to work around an issue in the resampling
# implementation for NetCDF data: otherwise the original data
# would be garbage collected too early.
if not self.scn_original:
self.scn_original = self.scn # noqa - Do not simply remove
# deactivating reduce_data, see https://github.com/pytroll/satpy/issues/2476
reduce_data = False if resampler == "native" else True
self.scn = self.scn.resample(
target_area_def,
resampler=resampler,
radius_of_influence=self.resampling_info["radius_of_influence"],
reduce_data=reduce_data,
)
def _get_fci_segment_height(self, segment_number: int, segment_width: int) -> int:
try:
seg_heights = self.scn._readers[self.reader].segment_heights
# we can assume that we're reading only from one filetype (either FDHSI or HRFI)
file_type = list(self.scn._readers[self.reader].file_handlers.keys())[0]
return seg_heights(file_type, segment_width)[segment_number - 1]
except AttributeError:
LOG.exception(
"You're probably using an old version of Satpy. Please update Satpy to be >=0.39.0 "
"to use merge_with_existing=True with the FCI reader."
)
raise
return -1
def _calc_segment_heights(self, segments_data, segments_indices):
def get_segment_height_calculator() -> Callable:
if self.reader in ["fci_l1c_nc", "fci_l1c_fdhsi"]:
return self._get_fci_segment_height
else:
return lambda x, y: segments_data.shape[0] // len(segments_indices)
calc_segment_height = get_segment_height_calculator()
expected_num_segments = self._extract_expected_segments()
segment_heights = np.zeros((expected_num_segments,))
for i in range(1, expected_num_segments + 1):
segment_heights[i - 1] = calc_segment_height(i, segments_data.shape[1])
return segment_heights.astype(int)
def _determine_segments_to_image_mapping(self, segments_data, segments_indices) -> Tuple[List, List]:
segment_heights = self._calc_segment_heights(segments_data, segments_indices)
segment_starts_stops = []
image_starts_stops = []
visited_segments: List[int] = []
for segment_num in segments_indices:
curr_segment_height_idx = segment_num - 1
curr_segment_height = segment_heights[curr_segment_height_idx]
image_stop_pos = segment_heights[curr_segment_height_idx:].sum()
image_start_pos = image_stop_pos - curr_segment_height
curr_segment_stop = segments_data.shape[0]
if visited_segments:
curr_segment_stop -= segment_heights[visited_segments].sum()
curr_segment_start = curr_segment_stop - curr_segment_height
visited_segments.append(curr_segment_height_idx)
segment_starts_stops.append((curr_segment_start, curr_segment_stop))
image_starts_stops.append((image_start_pos, image_stop_pos))
return segment_starts_stops, image_starts_stops
[docs]
def merge_data_into_memmap(self, segments_data, image_data, segments_indices):
"""
Merge new segments from data into img_data. The data is expected to
contain the data for all segments with the last segment (that has the
largest segment number) as first in the data array.
The segments list defines which chunk of data belongs to which segment.
The list must be sorted ascending.
Those requirements match the current Satpy behavior which loads segments
in ascending order and produce a data array with the last segments data
first.
:param segments_data: the segments data as provided by Satpy importer
:param image_data: the dataset data where the segments are merged into
:param segments_indices: list of segments whose data is to be merged
Note: this is not the highest segment number in the current segments
list but the highest segment number which can appear for the product.
"""
segment_starts_stops, image_starts_stops = self._determine_segments_to_image_mapping(
segments_data, segments_indices
)
for i in range(len(segment_starts_stops)):
segment_start = segment_starts_stops[i][0]
segment_stop = segment_starts_stops[i][1]
image_start = image_starts_stops[i][0]
image_stop = image_starts_stops[i][1]
image_data[image_start:image_stop, :] = segments_data[segment_start:segment_stop, :]
def _add_content_to_cache(self, c: Content) -> None:
if self.use_inventory_db:
self._S.add(c)
self._S.commit()
def _create_data_memmap_file(self, data: da.array, dtype, prod: Product) -> Tuple[str, Optional[np.memmap]]:
"""
Create *binary* file in the current working directory to cache `data`
as numpy memmap. The filename extension of the file is derived from the
kind of the given `prod`.
'dtype' must be given explicitly because it is not necessarily
``dtype == data.dtype`` (caller decides).
"""
# shovel that data into the memmap incrementally
kind = prod.info[Info.KIND]
data_filename = "{}.{}".format(prod.uuid, kind.name.lower())
data_path = os.path.join(self._cwd, data_filename)
if data is None or data.size == 0:
# For empty data memmap is not possible
return data_filename, None
data_memmap = np.memmap(data_path, dtype=dtype, shape=data.shape, mode="w+")
da.store(data, data_memmap)
return data_filename, data_memmap