Source code for python.task_manager.background_task

# Copyright (c) 2015 Shotgun Software Inc.
#
# CONFIDENTIAL AND PROPRIETARY
#
# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
# Source Code License included in this distribution package. See LICENSE.
# By accessing, using, copying or modifying this work you indicate your
# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
# not expressly granted therein are reserved by Shotgun Software Inc.

"""
"""
import traceback

import sgtk
from sgtk.platform.qt import QtCore
from sgtk import TankError

[docs]class BackgroundTask(object): """ Container class for a single task. A task is a Python callable (function/method/class) that takes some arguments, does some work and returns its result. Each task will be run in a thread and tasks will be executed in priority order. For example:: def task_fetch_status(): return status_of_something() ... task_manager.add_task(task_fetch_status) ... def on_task_completion(task, group, result): status = result # do something with the status Additionally, tasks can be chained together so that the output of one task can be passed directly to the input of one or more downstream tasks. To achieve this, the upstream task must returns it's result as a dictionary and this dictionary is added to the named parameters of any downstream tasks by the task manager. Care should be taken when constructing these tasks that the result of one upstream task doesn't unintentionally overwrite any existing named parameters for a downstream task. For example:: def task_fetch_status(): return {"status":status_of_something()} def task_do_something(status): result = None if status: result = result_of_doing_something() return result ... status_task_id = task_manager.add_task(task_fetch_status, priority=1) work_task_id = task_manager.add_task(task_do_something, priority=2, upstream_task_ids = [status_task_id]) ... def on_task_completion(task, group, result): if task.id = work_task_id: # do something with the result ... Upstream tasks can be fed into multiple down-stream tasks and the task priority can also be different so for example all status fetches could be set to happen before all do-somethings by setting the priority accordingly. Down-stream tasks will also not start before it's upstream tasks have completed. """ def __init__(self, task_id, cbl, group, priority, args, kwargs): """ Construction. :param task_id: The unique id for this task :param cbl: Callable to execute to perform the task :param group: The group that this task belongs to :param priority: The priority this task should be run with :param args: Additional arguments that should be passed to func :param kwargs: Additional named arguments that should be passed to func """ self._uid = task_id self._cbl = cbl self._args = args or [] self._kwargs = kwargs or {} self._group = group self._priority = priority def __repr__(self): """ Create a string representation of this instance :returns: A string representation of this instance """ return "[%d, G:%s, P:%s] %s" % (self._uid, self._group, self._priority, self._cbl.__name__) @property def uid(self): """ :returns: The unique id of this task """ return self._uid @property def group(self): """ :returns: The group this task belongs to """ return self._group @property def priority(self): """ :returns: The priority for this task """ return self._priority
[docs] def append_upstream_result(self, result): """ Append the result from an upstream task to this tasks kwargs. In order for the result to be appended to the task it must be a dictionary. Each entry in the result dictionary is then added to the tasks named parameters so care should be taken when building the tasks that named parameters for a downstream task are not unintentionally overwritten by the result of an upstream task. :param result: A dictionary containing the result from an upstream task. If result is not a dictionary then it will be ignored. """ if result and isinstance(result, dict): self._kwargs = dict(self._kwargs.items() + result.items())
[docs] def run(self): """ Perform this task :returns: The result of performing the task """ return self._cbl(*self._args, **self._kwargs)