#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
queue.py
~~~~~~~~
PURPOSE
Global background task queue for loading, rendering, et cetera.
Use TheQueue.add() to create background behavior.
Note that Qt4 facilities other than signals should not be used on the task queue!
REFERENCES
REQUIRES
:author: R.K.Garcia <rayg@ssec.wisc.edu>
:copyright: 2014 by University of Wisconsin Regents, see AUTHORS for more details
:license: GPLv3, see LICENSE for more details
"""
__author__ = "rayg"
__docformat__ = "reStructuredText"
import logging
from collections import OrderedDict
from PyQt5.QtCore import QObject, QThread, pyqtSignal
LOG = logging.getLogger(__name__)
# keys for status dictionaries
TASK_DOING = ("activity", str)
TASK_PROGRESS = ("progress", float) # 0.0 - 1.0 progress
# singleton instance used by clients
TheQueue = None
[docs]
class Worker(QThread):
"""
Worker thread use by TaskQueue
"""
# worker id, sequence of dictionaries listing update information to be propagated to view
workerDidMakeProgress = pyqtSignal(int, list)
# task-key, ok: False if exception occurred else True
workerDidCompleteTask = pyqtSignal(str, bool)
def __init__(self, myid: int):
super(Worker, self).__init__()
self.queue: OrderedDict = OrderedDict()
self.depth = 0
self.id = myid
[docs]
def add(self, key, task_iterable):
# FUTURE: replace queued task if key matches
self.queue[key] = task_iterable
self.depth = len(self.queue)
self.start()
def _did_progress(self, task_status):
"""
Summarize the task queue, including progress, and send it out as a signal
:param task_status:
:return:
"""
# FIXME: this should have entries for the upcoming stuff as well so we can have an Activity panel
info = [task_status] if task_status else []
self.workerDidMakeProgress.emit(self.id, info)
[docs]
def run(self):
while len(self.queue) > 0:
key, task = self.queue.popitem(last=False)
# LOG.debug('starting background work on {}'.format(key))
ok = True
try:
for status in task:
self._did_progress(status)
except Exception:
# LOG.error("Background task failed")
LOG.error("Background task exception: ", exc_info=True)
ok = False
self.workerDidCompleteTask.emit(key, ok)
self.depth = 0
self._did_progress(None)
[docs]
class TaskQueue(QObject):
"""
Global background task queue for loading, rendering, et cetera.
Includes state updates and GUI links.
Eventually will include thread pools and multiprocess pools.
Two threads for interactive tasks (high priority), one thread for background tasks (low priority): 0, 1, 2
"""
didMakeProgress = pyqtSignal(list) # sequence of dictionaries listing update information to be propagated to view
# started : inherited
# finished : inherited
# terminated : inherited
def __init__(self, process_pool=None):
super(TaskQueue, self).__init__()
self._interactive_round_robin = 0 # swaps between 0 and 1 for interactive tasks
self.process_pool = process_pool # thread pool for background activity
self._completion_futures = {} # dictionary of id(task) : completion(bool)
self._last_status = [] # list of last status reports for different workers
self.workers = [] # thread pool for background activity
for id in range(3):
worker = Worker(id)
worker.workerDidMakeProgress.connect(self._did_progress)
worker.workerDidCompleteTask.connect(self._did_complete_task)
self.workers.append(worker)
self._last_status.append(None)
global TheQueue
assert TheQueue is None # nosec B101
TheQueue = self
@property
def depth(self):
return sum([x.depth for x in self.workers])
@property
def remaining(self):
return sum([len(x.queue) for x in self.workers])
[docs]
def add(
self,
key,
task_iterable,
description,
interactive=False,
and_then=None,
use_process_pool=False,
use_thread_pool=False,
):
"""Add an iterable task which will yield progress information dictionaries.
Expect behavior like this::
for task in queue:
for status_info in task:
update_display(status_info)
pop_display(final_status_info)
Args:
key (str): unique key for task. Queuing the same key will result in the old task being removed
and the new one deferred to the end
task_iterable (iter): callable resulting in an iterable, or an iterable itself to be run on the background
"""
if interactive:
wdex = self._interactive_round_robin
self._interactive_round_robin += 1
self._interactive_round_robin %= 2 # TODO(nk) worker count is hardcoded: worker_count-1
else:
wdex = 2
if callable(and_then):
self._completion_futures[key] = and_then
self.workers[wdex].add(key, task_iterable)
def _did_progress(self, worker_id, worker_status):
"""
Summarize the task queue, including progress, and send it out as a signal
:param worker_status: list of active items, or at least the thing it's working on now
:return:
"""
# FIXME: this should consolidate entries for the upcoming stuff as well so we can have an Activity panel
self._last_status[worker_id] = worker_status
# report on the lowest worker number that's active; (0,1 interactive; 2 background)
# yes, this will be redundant
# FUTURE make this a more useful signal content, rather than relying on progress_ratio back-query
for wdex, status in enumerate(self._last_status):
if self.workers[wdex].isRunning() and status is not None:
self.didMakeProgress.emit(status)
return
# otherwise this is a notification that we're finally at full idle
self.didMakeProgress.emit([{TASK_DOING: "", TASK_PROGRESS: 0.0}])
# FUTURE: consider one progress bar per worker
def _did_complete_task(self, task_key: str, succeeded: bool):
# LOG.debug("background task complete!")
todo = self._completion_futures.pop(task_key, None)
if callable(todo):
LOG.debug("completed task {}, and_then we do this...".format(succeeded))
todo(succeeded)
# else:
# LOG.debug("nothing further to do <{}>".format(repr(todo)))
[docs]
def progress_ratio(self, current_progress=None):
depth = self.depth
if depth == 0:
return 0.0
elif depth == 1 and current_progress is not None:
# show something other than 50% if there is only 1 job
return current_progress
else:
depth, remaining = self.depth, self.remaining
return float(depth - remaining) / depth