#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
visuals.py
~~~~~~~~~~~
PURPOSE
Dataset representation - the "physical" realization of content to draw on the map.
A dataset representation can have multiple levels of detail.
REFERENCES
REQUIRES
:author: R.K.Garcia <rayg@ssec.wisc.edu>
:copyright: 2014 by University of Wisconsin Regents, see AUTHORS for more details
:license: GPLv3, see LICENSE for more details
"""
import logging
from datetime import datetime
from typing import Optional
import numpy as np
import shapefile
from vispy.color import Color
from vispy.gloo import VertexBuffer
from vispy.scene.visuals import create_visual_node
from vispy.util.profiler import Profiler
from vispy.visuals import ImageVisual, IsocurveVisual, LineVisual
# The below imports are needed because we subclassed ImageVisual and ArrowVisual
from vispy.visuals.line.arrow import ArrowVisual, _ArrowHeadVisual
from vispy.visuals.line.line import _AggLineVisual, _GLLineVisual
from vispy.visuals.shaders import Function, FunctionChain
from vispy.visuals.transforms import as_vec4
from uwsift.common import (
DEFAULT_PROJECTION,
DEFAULT_TEXTURE_HEIGHT,
DEFAULT_TEXTURE_WIDTH,
DEFAULT_TILE_HEIGHT,
DEFAULT_TILE_WIDTH,
TESS_LEVEL,
Box,
IndexBox,
Point,
Resolution,
ViewBox,
)
from uwsift.view.texture_atlas import (
MultiChannelGPUScaledTexture2D,
MultiChannelTextureAtlas2D,
TextureAtlas2D,
)
from uwsift.view.tile_calculator import (
TileCalculator,
calc_pixel_size,
get_reference_points,
)
__author__ = "rayg"
__docformat__ = "reStructuredText"
LOG = logging.getLogger(__name__)
# if the absolute value of a vertex coordinate is beyond 'CANVAS_EPSILON'
# then we consider it invalid
# these values can get large when zoomed way in
CANVAS_EPSILON = 1e5
# CANVAS_EPSILON = 1e30
[docs]
class ArrayProxy(object):
def __init__(self, ndim, shape):
self.ndim = ndim
self.shape = shape
[docs]
class TextureTileState(object):
"""Object to hold the state of the current tile texture.
Terms:
- itile: Image Tile, Tile in the image being shown. Coordinates are (Y, X)
- lod: Level of Detail, Level of detail for the image tiles (1 highest - 5 lower)
- ttile: Texture Tile, Tile in the actual GPU texture storage (0 to `num_tiles`)
This class is meant to be used as a bookkeeper/consultant right before taking action
on the Texture Atlas.
"""
def __init__(self, num_tiles):
self.num_tiles = num_tiles
self.reset()
def __getitem__(self, item):
"""Get the texture index associated with this image tile index."""
return self.itile_cache[item]
def __contains__(self, item):
"""Have we already added this image tile index (yidx, xidx)."""
return item in self.itile_cache
[docs]
def reset(self):
self.itile_cache = {}
self._rev_cache = {}
# True if the data doesn't matter, False if data matters
self.tile_free = [True] * self.num_tiles
self.itile_age = []
def _next_available_tile(self):
for idx, tile_free in enumerate(self.tile_free):
if tile_free:
return idx
# We don't have any free tiles, remove the oldest one
LOG.debug("Expiring image tile from texture atlas: %r", self.itile_age[0])
ttile_idx = self._remove_tile(self.itile_age[0])
return ttile_idx
def _refresh_age(self, itile_idx):
"""Update the age of an image tile so it is less likely to expire."""
try:
# Remove it from wherever it is
self.itile_age.remove(itile_idx)
except ValueError:
# we haven't heard about this tile, that's ok, we'll add it to the list
pass
# Put it to the end as the "youngest" tile
self.itile_age.append(itile_idx)
[docs]
def add_tile(self, itile_idx, expires=True):
"""Get texture index for new tile. If tile is already known return its current location.
Note, this should be called even when the caller knows the tile exists to refresh the "age".
"""
# Have we already added this tile, get the tile index
if itile_idx in self:
if expires:
self._refresh_age(itile_idx)
return self[itile_idx]
ttile_idx = self._next_available_tile()
self.itile_cache[itile_idx] = ttile_idx
self._rev_cache[ttile_idx] = itile_idx
self.tile_free[ttile_idx] = False
if expires:
self._refresh_age(itile_idx)
return ttile_idx
def _remove_tile(self, itile_idx):
ttile_idx = self.itile_cache.pop(itile_idx)
self._rev_cache.pop(ttile_idx)
self.tile_free[ttile_idx] = True
self.itile_age.remove(itile_idx)
return ttile_idx
[docs]
class SIFTTiledGeolocatedMixin:
def __init__(
self,
data,
*area_params,
tile_shape=(DEFAULT_TILE_HEIGHT, DEFAULT_TILE_WIDTH),
texture_shape=(DEFAULT_TEXTURE_HEIGHT, DEFAULT_TEXTURE_WIDTH),
wrap_lon=False,
projection=DEFAULT_PROJECTION,
**visual_kwargs,
):
origin_x, origin_y, cell_width, cell_height = area_params
if visual_kwargs.get("method", "subdivide") != "subdivide":
raise ValueError("Only 'subdivide' drawing method is supported.")
visual_kwargs["method"] = "subdivide"
if "grid" in visual_kwargs:
raise ValueError("The 'grid' keyword argument is not supported with the tiled mixin.")
# visual nodes already have names, so be careful
if not hasattr(self, "name"):
self.name = visual_kwargs.pop("name", None)
self._init_geo_parameters(
origin_x,
origin_y,
cell_width,
cell_height,
projection,
texture_shape,
tile_shape,
wrap_lon,
visual_kwargs.get("shape"),
data,
)
# Call the init of the Visual
super().__init__(data, **visual_kwargs)
def _init_geo_parameters(
self, origin_x, origin_y, cell_width, cell_height, projection, texture_shape, tile_shape, wrap_lon, shape, data
):
self._viewable_mesh_mask = None
self._ref1 = None
self._ref2 = None
self.origin_x = origin_x
self.origin_y = origin_y
self.cell_width = cell_width
self.cell_height = cell_height # Note: cell_height is usually negative
self.texture_shape = texture_shape
self.tile_shape = tile_shape
self.num_tex_tiles = self.texture_shape[0] * self.texture_shape[1]
self._stride = (0, 0)
self._latest_tile_box = None
self.wrap_lon = wrap_lon
self._tiles = {}
assert shape or data is not None, "`data` or `shape` must be provided" # nosec B101
self.shape = shape or data.shape
self.ndim = len(self.shape) or data.ndim
# Where does this image lie in this lonely world
self.calc = TileCalculator(
self.name,
self.shape,
Point(x=self.origin_x, y=self.origin_y),
Resolution(dy=abs(self.cell_height), dx=abs(self.cell_width)),
self.tile_shape,
self.texture_shape,
wrap_lon=self.wrap_lon,
projection=projection,
)
# What tiles have we used and can we use
self.texture_state = TextureTileState(self.num_tex_tiles)
def _normalize_data(self, data):
if data is not None and data.dtype == np.float64:
data = data.astype(np.float32)
return data
def _build_texture_tiles(self, data, stride, tile_box: IndexBox):
"""Prepare and organize strided data in to individual tiles with associated information."""
data = self._normalize_data(data)
LOG.debug(
"Uploading texture data for %d tiles (%r)",
(tile_box.bottom - tile_box.top) * (tile_box.right - tile_box.left),
tile_box,
)
# Tiles start at upper-left so go from top to bottom
tiles_info = []
for tiy in range(tile_box.top, tile_box.bottom):
for tix in range(tile_box.left, tile_box.right):
already_in = (stride, tiy, tix) in self.texture_state
# Update the age if already in there
# Assume that texture_state does not change from the main thread if this is run in another
tex_tile_idx = self.texture_state.add_tile((stride, tiy, tix))
if already_in:
# FIXME: we should make a list/set of the tiles we need to add before this
continue
# Assume we were given a total image worth of this stride
y_slice, x_slice = self.calc.calc_tile_slice(tiy, tix, stride)
tile_data = self._slice_texture_tile(data, y_slice, x_slice)
tiles_info.append((stride, tiy, tix, tex_tile_idx, tile_data))
return tiles_info
def _slice_texture_tile(self, data, y_slice, x_slice):
# force a copy of the data from the content array (provided by the workspace)
# to a vispy-compatible contiguous float array
# this can be a potentially time-expensive operation since content array is
# often huge and always memory-mapped, so paging may occur
# we don't want this paging deferred until we're back in the GUI thread pushing data to OpenGL!
return np.array(data[y_slice, x_slice], dtype=np.float32)
def _set_texture_tiles(self, tiles_info):
for tile_info in tiles_info:
stride, tiy, tix, tex_tile_idx, data = tile_info
self._texture.set_tile_data(tex_tile_idx, data)
def _build_vertex_tiles(self, preferred_stride, tile_box: IndexBox):
"""Rebuild the vertex buffers used for rendering the image when using
the subdivide method.
SIFT Note: Copied from 0.5.0dev original ImageVisual class
"""
total_num_tiles = (tile_box.bottom - tile_box.top) * (tile_box.right - tile_box.left)
if total_num_tiles <= 0:
# we aren't looking at this image
# FIXME: What's the correct way to stop drawing here
raise RuntimeError("View calculations determined a negative number of tiles are visible")
elif total_num_tiles > self.num_tex_tiles:
LOG.warning("Current view sees more tiles than can be held in the GPU")
# We continue on, showing as many tiles as we can
tex_coords = np.empty((6 * total_num_tiles * (TESS_LEVEL * TESS_LEVEL), 2), dtype=np.float32)
vertices = np.empty((6 * total_num_tiles * (TESS_LEVEL * TESS_LEVEL), 2), dtype=np.float32)
# What tile are we currently describing out of all the tiles being viewed
used_tile_idx = -1
LOG.debug("Building vertex data for %d tiles (%r)", total_num_tiles, tile_box)
tl = TESS_LEVEL * TESS_LEVEL
# Tiles start at upper-left so go from top to bottom
for tiy in range(tile_box.top, tile_box.bottom):
for tix in range(tile_box.left, tile_box.right):
# Update the index here because we have multiple exit/continuation points
used_tile_idx += 1
# Check if the tile we want to draw is actually in the GPU
# if not (atlas too small?) fill with zeros and keep going
if (preferred_stride, tiy, tix) not in self.texture_state:
# THIS SHOULD NEVER HAPPEN IF TEXTURE BUILDING IS DONE CORRECTLY AND THE ATLAS IS BIG ENOUGH
tile_start = TESS_LEVEL * TESS_LEVEL * used_tile_idx * 6
tile_end = TESS_LEVEL * TESS_LEVEL * (used_tile_idx + 1) * 6
tex_coords[tile_start:tile_end, :] = 0
vertices[tile_start:tile_end, :] = 0
continue
# we should have already loaded the texture data in to the GPU so get the index of that texture
tex_tile_idx = self.texture_state[(preferred_stride, tiy, tix)]
factor_rez, offset_rez = self.calc.calc_tile_fraction(tiy, tix, preferred_stride)
tex_coords[tl * used_tile_idx * 6 : tl * (used_tile_idx + 1) * 6, :] = (
self.calc.calc_texture_coordinates(
tex_tile_idx, factor_rez, offset_rez, tessellation_level=TESS_LEVEL
)
)
vertices[tl * used_tile_idx * 6 : tl * (used_tile_idx + 1) * 6, :] = self.calc.calc_vertex_coordinates(
tiy,
tix,
preferred_stride[0],
preferred_stride[1],
factor_rez,
offset_rez,
tessellation_level=TESS_LEVEL,
)
return vertices, tex_coords
def _set_vertex_tiles(self, vertices, tex_coords):
self._subdiv_position.set_data(vertices.astype("float32"))
self._subdiv_texcoord.set_data(tex_coords.astype("float32"))
[docs]
def determine_reference_points(self):
# Image points transformed to canvas coordinates
img_cmesh = self.transforms.get_transform().map(self.calc.image_mesh)
# Mask any points that are really far off screen (can't be transformed)
valid_mask = (np.abs(img_cmesh[:, 0]) < CANVAS_EPSILON) & (np.abs(img_cmesh[:, 1]) < CANVAS_EPSILON)
# The image mesh projected to canvas coordinates (valid only)
img_cmesh = img_cmesh[valid_mask]
# The image mesh of only valid "viewable" projected coordinates
img_vbox = self.calc.image_mesh[valid_mask]
if not img_cmesh[:, 0].size or not img_cmesh[:, 1].size:
self._viewable_mesh_mask = None
self._ref1, self._ref2 = None, None
return
x_cmin, x_cmax = img_cmesh[:, 0].min(), img_cmesh[:, 0].max()
y_cmin, y_cmax = img_cmesh[:, 1].min(), img_cmesh[:, 1].max()
center_x = (x_cmax - x_cmin) / 2.0 + x_cmin
center_y = (y_cmax - y_cmin) / 2.0 + y_cmin
dist = img_cmesh.copy()
dist[:, 0] = center_x - img_cmesh[:, 0]
dist[:, 1] = center_y - img_cmesh[:, 1]
self._viewable_mesh_mask = valid_mask
self._ref1, self._ref2 = get_reference_points(dist, img_vbox)
[docs]
def get_view_box(self):
"""Calculate shown portion of image and image units per pixel
This method utilizes a precomputed "mesh" of relatively evenly
spaced points over the entire image space. This mesh is transformed
to the canvas space (-1 to 1 user-viewed space) to figure out which
portions of the image are currently being viewed and which portions
can actually be projected on the viewed projection.
While the result of the chosen method may not always be completely
accurate, it should work for all possible viewing cases.
"""
if self._viewable_mesh_mask is None or self.canvas.size[0] == 0 or self.canvas.size[1] == 0:
raise ValueError("Image '%s' is not viewable in this projection" % (self.name,))
# Image points transformed to canvas coordinates
img_cmesh = self.transforms.get_transform().map(self.calc.image_mesh)
# The image mesh projected to canvas coordinates (valid only)
img_cmesh = img_cmesh[self._viewable_mesh_mask]
# The image mesh of only valid "viewable" projected coordinates
img_vbox = self.calc.image_mesh[self._viewable_mesh_mask]
ref_idx_1, ref_idx_2 = get_reference_points(img_cmesh, img_vbox)
dx, dy = calc_pixel_size(
img_cmesh[(self._ref1, self._ref2), :], img_vbox[(self._ref1, self._ref2), :], self.canvas.size
)
view_extents = self.calc.calc_view_extents(img_cmesh[ref_idx_1], img_vbox[ref_idx_1], self.canvas.size, dx, dy)
return ViewBox(*view_extents, dx=dx, dy=dy)
def _get_stride(self, view_box):
return self.calc.calc_stride(view_box)
[docs]
def assess(self):
"""Determine if a retile is needed.
Tell workspace we will be needed
"""
try:
view_box = self.get_view_box()
preferred_stride = self._get_stride(view_box)
tile_box = self.calc.visible_tiles(view_box, stride=preferred_stride, extra_tiles_box=Box(1, 1, 1, 1))
except ValueError as e:
# If image is outside of canvas, then an exception will be raised
LOG.warning("Could not determine viewable image area for '{}': {}".format(self.name, e))
return False, self._stride, self._latest_tile_box
num_tiles = (tile_box.bottom - tile_box.top) * (tile_box.right - tile_box.left)
LOG.debug(
"Assessment: Prefer '%s' have '%s', was looking at %r, now looking at %r",
preferred_stride,
self._stride,
self._latest_tile_box,
tile_box,
)
# If we zoomed out or we panned
need_retile = (num_tiles > 0) and (preferred_stride != self._stride or self._latest_tile_box != tile_box)
return need_retile, preferred_stride, tile_box
[docs]
def retile(self, data, preferred_stride, tile_box):
"""Get data from workspace and retile/retexture as needed."""
tiles_info = self._build_texture_tiles(data, preferred_stride, tile_box)
vertices, tex_coords = self._build_vertex_tiles(preferred_stride, tile_box)
return tiles_info, vertices, tex_coords
[docs]
def set_retiled(self, preferred_stride, tile_box, tiles_info, vertices, tex_coords):
self._set_texture_tiles(tiles_info)
self._set_vertex_tiles(vertices, tex_coords)
# don't update here, the caller will do that
# Store the most recent level of detail that we've done
self._stride = preferred_stride
self._latest_tile_box = tile_box
[docs]
class TiledGeolocatedImageVisual(SIFTTiledGeolocatedMixin, ImageVisual):
def __init__(self, data, origin_x, origin_y, cell_width, cell_height, **image_kwargs):
super().__init__(data, origin_x, origin_y, cell_width, cell_height, **image_kwargs)
def _init_texture(self, data, texture_format):
if self._interpolation == "bilinear":
texture_interpolation = "linear"
else:
texture_interpolation = "nearest"
tex = TextureAtlas2D(
self.texture_shape,
tile_shape=self.tile_shape,
interpolation=texture_interpolation,
internalformat="auto",
)
return tex
[docs]
def set_data(self, image):
"""Set the data
Parameters
----------
image : array-like
The image data.
"""
if self._data is not None:
raise NotImplementedError("This image subclass does not support the 'set_data' method.")
# only do this on __init__
super().set_data(image)
def _build_texture(self):
# _build_texture should not be used in this class, use the 2-step
# process of '_build_texture_tiles' and '_set_texture_tiles'
self._need_texture_upload = False
def _build_vertex_data(self):
# _build_vertex_data should not be used in this class, use the 2-step
# process of '_build_vertex_tiles' and '_set_vertex_tiles'
return
TiledGeolocatedImage = create_visual_node(TiledGeolocatedImageVisual)
_rgb_texture_lookup = """
vec4 texture_lookup(vec2 texcoord) {
if(texcoord.x < 0.0 || texcoord.x > 1.0 ||
texcoord.y < 0.0 || texcoord.y > 1.0) {
discard;
}
vec4 val;
val.r = texture2D($texture_r, texcoord).r;
val.g = texture2D($texture_g, texcoord).r;
val.b = texture2D($texture_b, texcoord).r;
val.a = 1.0;
return val;
}"""
_apply_clim = """
vec4 apply_clim(vec4 color) {
// If all the pixels are NaN make it completely transparent
// http://stackoverflow.com/questions/11810158/how-to-deal-with-nan-or-inf-in-opengl-es-2-0-shaders
if (
!(color.r <= 0.0 || 0.0 <= color.r) &&
!(color.g <= 0.0 || 0.0 <= color.g) &&
!(color.b <= 0.0 || 0.0 <= color.b)) {
color.a = 0;
}
// if color is NaN, set to minimum possible value
color.r = !(color.r <= 0.0 || 0.0 <= color.r) ? min($clim_r.x, $clim_r.y) : color.r;
color.g = !(color.g <= 0.0 || 0.0 <= color.g) ? min($clim_g.x, $clim_g.y) : color.g;
color.b = !(color.b <= 0.0 || 0.0 <= color.b) ? min($clim_b.x, $clim_b.y) : color.b;
// clamp data to minimum and maximum of clims
color.r = clamp(color.r, min($clim_r.x, $clim_r.y), max($clim_r.x, $clim_r.y));
color.g = clamp(color.g, min($clim_g.x, $clim_g.y), max($clim_g.x, $clim_g.y));
color.b = clamp(color.b, min($clim_b.x, $clim_b.y), max($clim_b.x, $clim_b.y));
// linearly scale data between clims
color.r = (color.r - $clim_r.x) / ($clim_r.y - $clim_r.x);
color.g = (color.g - $clim_g.x) / ($clim_g.y - $clim_g.x);
color.b = (color.b - $clim_b.x) / ($clim_b.y - $clim_b.x);
return max(color, 0);
}
"""
_apply_gamma = """
vec4 apply_gamma(vec4 color) {
color.r = pow(color.r, $gamma_r);
color.g = pow(color.g, $gamma_g);
color.b = pow(color.b, $gamma_b);
return color;
}
"""
_null_color_transform = "vec4 pass(vec4 color) { return color; }"
[docs]
class SIFTMultiChannelTiledGeolocatedMixin(SIFTTiledGeolocatedMixin):
def _normalize_data(self, data_arrays):
if not isinstance(data_arrays, (list, tuple)):
return super()._normalize_data(data_arrays)
new_data = []
for data in data_arrays:
new_data.append(super()._normalize_data(data))
return new_data
def _init_geo_parameters(
self,
origin_x,
origin_y,
cell_width,
cell_height,
projection,
texture_shape,
tile_shape,
wrap_lon,
shape,
data_arrays,
):
if shape is None:
shape = self._compute_shape(shape, data_arrays)
ndim = len(shape) or [x for x in data_arrays if x is not None][0].ndim
data = ArrayProxy(ndim, shape)
super()._init_geo_parameters(
origin_x,
origin_y,
cell_width,
cell_height,
projection,
texture_shape,
tile_shape,
wrap_lon,
shape,
data,
)
self.set_channels(
data_arrays,
shape=shape,
cell_width=cell_width,
cell_height=cell_height,
origin_x=origin_x,
origin_y=origin_y,
)
[docs]
def set_channels(self, data_arrays, shape=None, cell_width=None, cell_height=None, origin_x=None, origin_y=None):
assert shape or data_arrays is not None, "`data` or `shape` must be provided" # nosec B101
if cell_width is not None:
self.cell_width = cell_width
if cell_height:
self.cell_height = cell_height # Note: cell_height is usually negative
if origin_x:
self.origin_x = origin_x
if origin_y:
self.origin_y = origin_y
self.shape = self._compute_shape(shape, data_arrays)
assert None not in (self.cell_width, self.cell_height, self.origin_x, self.origin_y, self.shape) # nosec B101
# how many of the higher resolution channel tiles (smaller geographic area) make
# up a low resolution channel tile
self._channel_factors = tuple(
self.shape[0] / float(chn.shape[0]) if chn is not None else 1.0 for chn in data_arrays
)
self._lowest_factor = max(self._channel_factors)
self._lowest_rez = Resolution(
abs(self.cell_height * self._lowest_factor), abs(self.cell_width * self._lowest_factor)
)
# Where does this image lie in this lonely world
self.calc = TileCalculator(
self.name,
self.shape,
Point(x=self.origin_x, y=self.origin_y),
Resolution(dy=abs(self.cell_height), dx=abs(self.cell_width)),
self.tile_shape,
self.texture_shape,
wrap_lon=self.wrap_lon,
)
# Reset texture state, if we change things to know which texture
# don't need to be updated then this can be removed/changed
self.texture_state.reset()
self._need_texture_upload = True
self._need_vertex_update = True
# Reset the tiling logic to force a retile
# even though we might be looking at the exact same spot
self._latest_tile_box = None
@staticmethod
def _compute_shape(shape, data_arrays):
return shape or max(data.shape for data in data_arrays if data is not None)
def _get_stride(self, view_box):
s = self.calc.calc_stride(view_box, texture=self._lowest_rez)
return Point(np.int64(s[0] * self._lowest_factor), np.int64(s[1] * self._lowest_factor))
def _slice_texture_tile(self, data_arrays, y_slice, x_slice):
new_data = []
for data in data_arrays:
if data is not None:
# explicitly ask for the parent class of MultiBandTextureAtlas2D
data = super()._slice_texture_tile(data, y_slice, x_slice)
new_data.append(data)
return new_data
class _NoColormap:
"""Placeholder colormap class to make MultiChannelImageVisual compatible with ImageVisual."""
def texture_lut(self):
"""Get empty (None) colormap data."""
return None
[docs]
class MultiChannelImageVisual(ImageVisual):
"""Visual subclass displaying an image from three separate arrays.
Note this Visual uses only GPU scaling, unlike the ImageVisual base
class which allows for CPU or GPU scaling.
Parameters
----------
data : list
A 3-element list of numpy arrays with 2 dimensons where the
arrays are sorted by (R, G, B) order. These will be put together
to make an RGB image. The list can contain ``None`` meaning there
is no value for this channel currently, but it may be filled in
later. In this case the underlying GPU storage is still allocated,
but pre-filled with NaNs. Note that each channel may have different
shapes.
cmap : str | Colormap
Unused by this Visual, but is still provided to the ImageVisual base
class.
clim : str | tuple | list | None
Limits of each RGB data array. If provided as a string it must be
"auto" and the limits will be computed on the fly. If a 2-element
tuple then it will be considered the color limits for all channel
arrays. If provided as a 3-element list of 2-element tuples then
they represent the color limits of each channel array.
gamma : float | list
Gamma to use during colormap lookup. Final value will be computed
``val**gamma`` for each RGB channel array. If provided as a float then
it will be used for each channel. If provided as a 3-element tuple
then each value is used for the separate channel arrays. Default is
1.0 for each channel.
**kwargs : dict
Keyword arguments to pass to :class:`~vispy.visuals.ImageVisual`. Note
that this Visual does not allow for ``texture_format`` to be specified
and is hardcoded to ``r32f`` internal texture format.
"""
def __init__(self, data_arrays, clim="auto", gamma=1.0, **kwargs):
if kwargs.get("texture_format") is not None:
raise ValueError("'texture_format' can't be specified with the " "'MultiChannelImageVisual'.")
kwargs["texture_format"] = "R32F"
if kwargs.get("cmap") is not None:
raise ValueError("'cmap' can't be specified with the" "'MultiChannelImageVisual'.")
kwargs["cmap"] = None
self.num_channels = len(data_arrays)
super().__init__(data_arrays, clim=clim, gamma=gamma, **kwargs)
def _init_texture(self, data_arrays, texture_format):
if self._interpolation == "bilinear":
texture_interpolation = "linear"
else:
texture_interpolation = "nearest"
tex = MultiChannelGPUScaledTexture2D(
data_arrays,
internalformat=texture_format,
format="LUMINANCE",
interpolation=texture_interpolation,
)
return tex
def _get_shapes(self, data_arrays):
shapes = [x.shape for x in data_arrays if x is not None]
if not shapes:
raise ValueError("List of data arrays must contain at least one " "numpy array.")
return shapes
def _get_min_shape(self, data_arrays):
return min(self._get_shapes(data_arrays))
def _get_max_shape(self, data_arrays):
return max(self._get_shapes(data_arrays))
@property
def size(self):
"""Get size of the image (width, height)."""
return self._get_max_shape(self._data)[:2][::-1]
@property
def clim(self):
"""Get color limits used when rendering the image (cmin, cmax)."""
return self._texture.clim
@clim.setter
def clim(self, clims):
if isinstance(clims, str) or len(clims) == 2:
clims = [clims] * self.num_channels
if self._texture.set_clim(clims):
self._need_texture_upload = True
self._update_colortransform_clim()
self.update()
def _update_colortransform_clim(self):
if self._need_colortransform_update:
# we are going to rebuild anyway so just do it later
return
try:
norm_clims = self._texture.clim_normalized
except RuntimeError:
return
else:
clim_names = ("clim_r", "clim_g", "clim_b")
# shortcut so we don't have to rebuild the whole color transform
for clim_name, clim in zip(clim_names, norm_clims):
# shortcut so we don't have to rebuild the whole color transform
self.shared_program.frag["color_transform"][1][clim_name] = clim
@property
def gamma(self):
"""Get the gamma used when rendering the image."""
return self._gamma
@gamma.setter
def gamma(self, value):
"""Set gamma used when rendering the image."""
if not isinstance(value, (list, tuple)):
value = [value] * self.num_channels
if any(val <= 0 for val in value):
raise ValueError("gamma must be > 0")
self._gamma = tuple(float(x) for x in value)
gamma_names = ("gamma_r", "gamma_g", "gamma_b")
for gamma_name, gam in zip(gamma_names, self._gamma):
# shortcut so we don't have to rebuild the color transform
if not self._need_colortransform_update:
self.shared_program.frag["color_transform"][2][gamma_name] = gam
self.update()
@ImageVisual.cmap.setter
def cmap(self, cmap):
if cmap is not None:
raise ValueError("MultiChannelImageVisual does not support a colormap.")
self._cmap = _NoColormap()
def _build_interpolation(self):
# assumes 'nearest' interpolation
interpolation = self._interpolation
if interpolation != "nearest":
raise NotImplementedError("MultiChannelImageVisual only supports 'nearest' interpolation.")
texture_interpolation = "nearest"
self._data_lookup_fn = Function(_rgb_texture_lookup)
self.shared_program.frag["get_data"] = self._data_lookup_fn
if self._texture.interpolation != texture_interpolation:
self._texture.interpolation = texture_interpolation
self._data_lookup_fn["texture_r"] = self._texture.textures[0]
self._data_lookup_fn["texture_g"] = self._texture.textures[1]
self._data_lookup_fn["texture_b"] = self._texture.textures[2]
self._need_interpolation_update = False
def _build_color_transform(self):
if self.num_channels != 3:
raise NotImplementedError("MultiChannelImageVisuals only support 3 channels.")
else:
# RGB/A image data (no colormap)
fclim = Function(_apply_clim)
fgamma = Function(_apply_gamma)
fun = FunctionChain(None, [Function(_null_color_transform), fclim, fgamma])
fclim["clim_r"] = self._texture.textures[0].clim_normalized
fclim["clim_g"] = self._texture.textures[1].clim_normalized
fclim["clim_b"] = self._texture.textures[2].clim_normalized
fgamma["gamma_r"] = self.gamma[0]
fgamma["gamma_g"] = self.gamma[1]
fgamma["gamma_b"] = self.gamma[2]
return fun
[docs]
def set_data(self, data_arrays):
"""Set the data
Parameters
----------
image : array-like
The image data.
"""
if self._data is not None and any(self._shape_differs(x1, x2) for x1, x2 in zip(self._data, data_arrays)):
self._need_vertex_update = True
data_arrays = list(self._cast_arrays_if_needed(data_arrays))
self._texture.check_data_format(data_arrays)
self._data = data_arrays
self._need_texture_upload = True
@staticmethod
def _cast_arrays_if_needed(data_arrays):
# FIXME: Remove the try/except and move the remaining import to the imports section of this file as soon as
# support for VisPy < 0.11.0 is dropped.
for data in data_arrays:
try:
# Since VisPy v0.12.0
from vispy.gloo.texture import downcast_to_32bit_if_needed
if data is not None:
data = downcast_to_32bit_if_needed(data)
except ImportError:
# Before VisPy v0.12.0
from vispy.gloo.texture import should_cast_to_f32
if data is not None and should_cast_to_f32(data.dtype):
data = data.astype(np.float32)
yield data
@staticmethod
def _shape_differs(arr1, arr2):
none_change1 = arr1 is not None and arr2 is None
none_change2 = arr1 is None and arr2 is not None
shape_change = False
if arr1 is not None and arr2 is not None:
shape_change = arr1.shape[:2] != arr2.shape[:2]
return none_change1 or none_change2 or shape_change
def _build_texture(self):
pre_clims = self._texture.clim
pre_internalformat = self._texture.internalformat
self._texture.scale_and_set_data(self._data)
post_clims = self._texture.clim
post_internalformat = self._texture.internalformat
# color transform needs rebuilding if the internalformat was changed
# new color limits need to be assigned if the normalized clims changed
# otherwise, the original color transform should be fine
# Note that this assumes that if clim changed, clim_normalized changed
new_if = post_internalformat != pre_internalformat
new_cl = post_clims != pre_clims
if new_if or new_cl:
self._need_colortransform_update = True
self._need_texture_upload = False
MultiChannelImage = create_visual_node(MultiChannelImageVisual)
[docs]
class RGBCompositeImageVisual(
SIFTMultiChannelTiledGeolocatedMixin, TiledGeolocatedImageVisual, MultiChannelImageVisual
):
def _init_texture(self, data_arrays, texture_format):
if self._interpolation == "bilinear":
texture_interpolation = "linear"
else:
texture_interpolation = "nearest"
tex_shapes = [self.texture_shape] * len(data_arrays)
tex = MultiChannelTextureAtlas2D(
tex_shapes,
tile_shape=self.tile_shape,
interpolation=texture_interpolation,
format="LUMINANCE",
internalformat="R32F",
)
return tex
RGBCompositeImage = create_visual_node(RGBCompositeImageVisual)
[docs]
class ShapefileLinesVisual(LineVisual):
def __init__(self, filepath, double=False, **kwargs):
LOG.debug("Using border shapefile '%s'", filepath)
self.sf = shapefile.Reader(filepath)
LOG.info("Loading boundaries: %s", datetime.utcnow().isoformat(" "))
# Prepare the arrays
total_points = 0
total_parts = 0
for one_shape in self.sf.iterShapes():
total_points += len(one_shape.points)
total_parts += len(one_shape.parts)
vertex_buffer = np.empty((total_points * 2 - total_parts * 2, 2), dtype=np.float32)
prev_idx = 0
for one_shape in self.sf.iterShapes():
# end_idx = prev_idx + len(one_shape.points) * 2 - len(one_shape.parts) * 2
# vertex_buffer[prev_idx:end_idx:2] = one_shape.points[:-1]
# for part_idx in one_shape.parts:
for part_start, part_end in zip(one_shape.parts, list(one_shape.parts[1:]) + [len(one_shape.points)]):
end_idx = prev_idx + (part_end - part_start) * 2 - 2
vertex_buffer[prev_idx:end_idx:2] = one_shape.points[part_start : part_end - 1]
vertex_buffer[prev_idx + 1 : end_idx : 2] = one_shape.points[part_start + 1 : part_end]
prev_idx = end_idx
# Clip lats to +/- 89.9 otherwise PROJ.4 on mercator projection will fail
np.clip(vertex_buffer[:, 1], -89.9, 89.9, out=vertex_buffer[:, 1])
# vertex_buffer[:, 0], vertex_buffer[:, 1] = self.proj(vertex_buffer[:, 0], vertex_buffer[:, 1])
if double:
LOG.debug("Adding 180 to 540 double of shapefile")
orig_points = vertex_buffer.shape[0]
vertex_buffer = np.concatenate((vertex_buffer, vertex_buffer), axis=0)
# vertex_buffer[orig_points:, 0] += C_EQ
vertex_buffer[orig_points:, 0] += 360
kwargs.setdefault("color", (1.0, 1.0, 1.0, 1.0))
kwargs.setdefault("width", 1)
super().__init__(pos=vertex_buffer, connect="segments", **kwargs)
LOG.info("Done loading boundaries: %s", datetime.utcnow().isoformat(" "))
ShapefileLines = create_visual_node(ShapefileLinesVisual)
[docs]
class NEShapefileLinesVisual(ShapefileLinesVisual):
"""Visual class for handling shapefiles from Natural Earth.
http://www.naturalearthdata.com/
There should be no difference in the format of the file, but some
assumptions can be made with data from Natural Earth about filenaming,
data resolution, fields and other record information that is normally
included in most Natural Earth files.
"""
pass
NEShapefileLines = create_visual_node(NEShapefileLinesVisual)
[docs]
class PrecomputedIsocurveVisual(IsocurveVisual):
"""IsocurveVisual that can use precomputed paths."""
def __init__(self, verts, connects, level_indexes, levels, **kwargs):
num_zoom_levels = len(levels)
num_levels_per_zlevel = [len(x) for x in levels]
self._zoom_level_indexes = [
level_indexes[: sum(num_levels_per_zlevel[: z_level + 1])] for z_level in range(num_zoom_levels)
]
self._zoom_level_size = [sum(z_level_indexes) for z_level_indexes in self._zoom_level_indexes]
self._all_verts = []
self._all_connects = []
self._all_levels = []
self._zoom_level = -1
for zoom_level in range(num_zoom_levels):
end_idx = self._zoom_level_size[zoom_level]
self._all_verts.append(verts[:end_idx])
self._all_connects.append(connects[:end_idx])
self._all_levels.append([x for y in levels[: zoom_level + 1] for x in y])
super(PrecomputedIsocurveVisual, self).__init__(data=None, levels=levels, **kwargs)
self._data = True
self._level_min = 0
self.zoom_level = kwargs.pop("zoom_level", 0)
@property
def zoom_level(self):
return self._zoom_level
@zoom_level.setter
def zoom_level(self, val):
if val == self._zoom_level:
return
self._zoom_level = val
self._li = self._zoom_level_indexes[self._zoom_level]
self._connect = self._all_connects[self._zoom_level]
self._verts = self._all_verts[self._zoom_level]
# this will trigger all of the recomputation
self.levels = self._all_levels[self._zoom_level]
@property
def clim(self):
return self._clim
@clim.setter
def clim(self, val):
assert len(val) == 2 # nosec B101
self._clim = val
self._need_level_update = True
self._need_color_update = True
self.update()
def _compute_iso_line(self):
"""compute LineVisual vertices, connects and color-index"""
return
PrecomputedIsocurve = create_visual_node(PrecomputedIsocurveVisual)
class _GLGradientLineVisual(_GLLineVisual):
def __init__(self, arrow_size, *args, **kwargs):
self._arrow_size = arrow_size
super().__init__(*args, **kwargs)
def _prepare_draw(self, view): # noqa: C901
prof = Profiler()
if self._parent._changed["pos"]:
if self._parent._pos is None:
return False
# todo: does this result in unnecessary copies?
pos = np.ascontiguousarray(self._parent._pos.astype(np.float32))
xf = view.transforms.get_transform("visual", "framebuffer")
# transform pos to pixel coords
pos_px = xf.map(pos)
# subtract necessary offset
line_dirs = pos_px[1::2, 0:2] - pos_px[0::2, 0:2]
line_dirs_normed = (1.0 / (np.linalg.norm(line_dirs, axis=1))) * line_dirs
offset = self._arrow_size / 2.0
pos_px[1::2, 0:2] -= offset * line_dirs_normed
# tranform back to visual coords
pos_re = xf.imap(pos_px)[:, 0:2].astype(np.float32)
self._pos_vbo.set_data(pos_re)
# self._pos_vbo.set_data(pos)
self._program.vert["position"] = self._pos_vbo
self._program.vert["to_vec4"] = self._ensure_vec4_func(pos.shape[-1])
self._parent._changed["pos"] = False
if self._parent._changed["color"]:
color, cmap = self._parent._interpret_color()
# If color is not visible, just quit now
if isinstance(color, Color) and color.is_blank:
return False
if isinstance(color, Function):
# TODO: Change to the parametric coordinate once that is done
self._program.vert["color"] = color("(gl_Position.x + 1.0) / 2.0")
else:
if color.ndim == 1:
self._program.vert["color"] = color
else:
self._color_vbo.set_data(color)
self._program.vert["color"] = self._color_vbo
self._parent._changed["color"] = False
self.shared_program["texture2D_LUT"] = cmap and cmap.texture_lut()
# Do we want to use OpenGL, and can we?
self.update_gl_state(line_smooth=bool(self._parent._antialias))
px_scale = self.transforms.pixel_scale
width = px_scale * self._parent._width
self.update_gl_state(line_width=max(width, 1.0))
if self._parent._changed["connect"]:
self._connect = self._parent._interpret_connect()
if isinstance(self._connect, np.ndarray):
self._connect_ibo.set_data(self._connect)
self._parent._changed["connect"] = False
if self._connect is None:
return False
prof("prepare")
# Draw
if isinstance(self._connect, str) and self._connect == "strip":
self._draw_mode = "line_strip"
self._index_buffer = None
elif isinstance(self._connect, str) and self._connect == "segments":
self._draw_mode = "lines"
self._index_buffer = None
elif isinstance(self._connect, np.ndarray):
self._draw_mode = "lines"
self._index_buffer = self._connect_ibo
else:
raise ValueError("Invalid line connect mode: %r" % self._connect)
prof("draw")
[docs]
class GradientLineVisual(LineVisual):
"""Gradient line visual
Parameters
----------
pos : array
Array of shape (..., 2) or (..., 3) specifying vertex coordinates.
color : Color, tuple, or array
The color to use when drawing the line. If an array is given, it
must be of shape (..., 4) and provide one rgba color per vertex.
Can also be a colormap name, or appropriate `Function`.
width:
The width of the line in px. Line widths > 1px are only
guaranteed to work when using 'agg' method.
connect : str or array
Determines which vertices are connected by lines.
* "strip" causes the line to be drawn with each vertex
connected to the next.
* "segments" causes each pair of vertices to draw an
independent line segment
* numpy arrays specify the exact set of segment pairs to
connect.
method : str
Mode to use for drawing.
* "agg" uses anti-grain geometry to draw nicely antialiased lines
with proper joins and endcaps.
* "gl" uses OpenGL's built-in line rendering. This is much faster,
but produces much lower-quality results and is not guaranteed to
obey the requested line width or join/endcap styles.
antialias : bool
Enables or disables antialiasing.
For method='gl', this specifies whether to use GL's line smoothing,
which may be unavailable or inconsistent on some platforms.
"""
def __init__(
self,
pos=None,
color=(0.5, 0.5, 0.5, 1),
width=1,
arrow_size=None,
connect="strip",
method="gl",
antialias=False,
):
self._line_visual = None
self._changed = {"pos": False, "color": False, "width": False, "connect": False}
self._pos = None
self._color = None
self._width = None
self._arrow_size = arrow_size
self._connect = None
self._bounds = None
self._antialias = None
self._method = "none"
super(LineVisual, self).__init__([])
# don't call subclass set_data; these often have different
# signatures.
LineVisual.set_data(self, pos=pos, color=color, width=width, connect=connect)
self.antialias = antialias
self.method = method
[docs]
def method(self, method):
if method not in ("agg", "gl"):
raise ValueError('method argument must be "agg" or "gl".')
if method == self._method:
return
self._method = method
if self._line_visual is not None:
self.remove_subvisual(self._line_visual)
if method == "gl":
self._line_visual = _GLGradientLineVisual(self, self._arrow_size)
elif method == "agg":
self._line_visual = _AggLineVisual(self)
self.add_subvisual(self._line_visual)
for k in self._changed:
self._changed[k] = True
class _TipAlignedArrowHeadVisual(_ArrowHeadVisual):
def __index__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _prepare_draw(self, view):
if self._parent._arrows_changed:
self._prepare_vertex_data(view)
self.shared_program.bind(self._arrow_vbo)
self.shared_program["antialias"] = 0.0
self.shared_program.frag["arrow_type"] = self._parent.arrow_type
self.shared_program.frag["fill_type"] = "filled"
def _prepare_vertex_data(self, view):
arrows = self._parent.arrows
xf = view.transforms.get_transform("visual", "framebuffer")
if arrows is None or arrows.size == 0:
self._arrow_vbo = VertexBuffer(np.array([], dtype=self._arrow_vtype))
return
# arrows present in (N/2 x 4), need (N x 2) where N is number of
# vertices
arrows = arrows.reshape(-1, 2)
# transform arrow positions to pixel coords
arrows_px = xf.map(arrows)
# subtract necessary offset
arrow_dirs = arrows_px[1::2, 0:2] - arrows_px[0::2, 0:2]
arrow_dirs_normed = (1.0 / (np.linalg.norm(arrow_dirs, axis=1)))[:, np.newaxis] * arrow_dirs
offset = self._parent.arrow_size / 2.0
arrows_px[1::2, 0:2] -= offset * arrow_dirs_normed
# tranform back to visual coords
arrows_re = xf.imap(arrows_px)[:, 0:2].astype(np.float32)
# arrows now again needed in (N/2 x 4)
arrows = arrows_re.reshape((-1, 4))
v = np.zeros(len(arrows), dtype=self._arrow_vtype)
# 2d // 3d v1 v2.
sh = int(arrows.shape[1] / 2)
v["v1"] = as_vec4(arrows[:, 0:sh])
v["v2"] = as_vec4(arrows[:, sh : int(2 * sh)])
v["size"][:] = self._parent.arrow_size
color, cmap = self._parent._interpret_color(self._parent.arrow_color)
v["color"][:] = color
v["linewidth"][:] = self._parent.width
self._arrow_vbo = VertexBuffer(v)
[docs]
class TipAlignedArrowVisual(ArrowVisual):
"""
Almost exactly the same as vispy's ArrowVisual except
for the arrow's head not being centered at the end of the arrow's
line but the arrows tip pointing to the coordinate of the arrow's line.
Parameters
----------
pos : array
Array of shape (..., 2) or (..., 3) specifying vertex coordinates.
color : Color, tuple, or array
The color to use when drawing the line. If an array is given, it
must be of shape (..., 4) and provide one rgba color per vertex.
Can also be a colormap name, or appropriate `Function`.
width:
The width of the line in px. Line widths > 1px are only
guaranteed to work when using 'agg' method.
connect : str or array
Determines which vertices are connected by lines.
* "strip" causes the line to be drawn with each vertex
connected to the next.
* "segments" causes each pair of vertices to draw an
independent line segment
* numpy arrays specify the exact set of segment pairs to
connect.
method : str
Mode to use for drawing.
* "agg" uses anti-grain geometry to draw nicely antialiased lines
with proper joins and endcaps.
* "gl" uses OpenGL's built-in line rendering. This is much faster,
but produces much lower-quality results and is not guaranteed to
obey the requested line width or join/endcap styles.
antialias : bool
Enables or disables antialiasing.
For method='gl', this specifies whether to use GL's line smoothing,
which may be unavailable or inconsistent on some platforms.
arrows : array
A (N, 4) or (N, 6) matrix where each row contains the (x, y) or the
(x, y, z) coordinate of the first and second vertex of the arrow
body. Remember that the second vertex is used as center point for
the arrow head, and the first vertex is only used for determining
the arrow head orientation.
arrow_type : string
Specify the arrow head type, the currently available arrow head types
are:
* stealth
* curved
* triangle_30
* triangle_60
* triangle_90
* angle_30
* angle_60
* angle_90
* inhibitor_round
arrow_size : float
Specify the arrow size
arrow_color : Color, tuple, or array
The arrow head color. If an array is given, it must be of shape
(..., 4) and provide one rgba color per arrow head. Can also be a
colormap name, or appropriate `Function`.
"""
def __init__(
self,
pos=None,
color=(0.5, 0.5, 0.5, 1),
width=1,
connect="strip",
method="gl",
antialias=False,
arrows=None,
arrow_type="stealth",
arrow_size=None,
arrow_color=(0.5, 0.5, 0.5, 1),
):
# Do not use the self._changed dictionary as it gets overwritten by
# the LineVisual constructor.
self._arrows_changed = False
self._arrow_type = None
self._arrow_size = None
self._arrows = None
self.arrow_type = arrow_type
self.arrow_size = arrow_size
self.arrow_color = arrow_color
self.arrow_head = _TipAlignedArrowHeadVisual(self)
# TODO: `LineVisual.__init__` also calls its own `set_data` method,
# which triggers an *update* event. This results in a redraw. After
# that we call our own `set_data` method, which triggers another
# redraw. This should be fixed.
GradientLineVisual.__init__(self, pos, color, width, arrow_size, connect, method, antialias)
TipAlignedArrowVisual.set_data(self, arrows=arrows)
# Add marker visual for the arrow head
self.add_subvisual(self.arrow_head)
[docs]
class LinesVisual(TipAlignedArrowVisual):
default_colors = {
"red": (1.0, 0.0, 0.0, 1.0),
"green": (0.0, 1.0, 0.0, 1.0),
}
def __init__(self, arrows: np.ndarray, colors: Optional[np.ndarray] = None):
# if colors not set, use green-red gradient
# if only one color given, use only one color
# otherwise two colors need to be provided by caller, one for each point
# in point pairs
n_points, n_coordinates = arrows.shape
if n_coordinates != 4:
raise AttributeError("Expected 4 coordinates per arrow.")
points = arrows.reshape(-1, 2)
# Remember that the second vertex is used as center point for the
# arrow head, and the first vertex is only used for determining
# the arrow head orientation.
if colors is None:
colors = np.array([np.array(self.default_colors["green"]), np.array(self.default_colors["red"])])
colors = np.tile(colors, (n_points, 1))
elif colors.ndim == 1 and colors.size == 4:
pass
elif colors.ndim == 2 and colors.shape[0] == 2:
colors = np.tile(colors, (n_points, 1))
else:
raise AttributeError(
"Too many colors provided or colors ill-formed, " "provide at-most two colors in RGBA format."
)
super().__init__(
pos=points,
arrows=arrows,
arrow_type="triangle_30",
color=colors,
arrow_color="w",
arrow_size=10,
connect="segments",
method="gl",
)
Lines = create_visual_node(LinesVisual)