conductor.lib.downloader2 module

Command Line Process to run downloads.

class conductor.lib.downloader2.Downloader(thread_count=None, location=None, output_dir=None)

Bases: object

A Downloader daemon which downloads completed frames from finished tasks.

Each task has an associated Download entity that represents all images/files that were produced from that task. A task may have more than file to download.

  1. Query the app for the “next” download to download. The app will return the Download that it deems most appropriate (probably of the lowest jobid). Note the app will automatically change the status of that Download entity to “downloading”. This is probably a terrible model (but it’s what I’m inheriting). Note that there is cron job that resets Download entities that are in state of Downloading but have not “dialed home” after x amount of time.
  2. Place this Download in the queue to be downloaded.
  3. Spawn threads to actively take “work” (Downloads) from the queue.
  4. Each thread is responsible for downloading the Download that it took from the queue.
  5. Each thread is responsible for updating the app of it’s downloading status: “downloading”, etc
  6. Each thread is responsible for catching their own exceptions and placing the Download back into the queue
  7. Because the downloader is threaded, it makes communication/tracking of data more difficult. In order to facilitate this, ThreadState objects are shared across threads to read/write data to. This allows download progress to be communicated from multiple threads into a single object, which can then be read from a single process and “summarized”.

A TaskDownloadState.files = [FileDownloadState, FileDownloadState].

To complicate matters further, if the downloader is killed (via keyboard, etc), the it should clean up after itself. So we need to catch the SIGINT EXIT signal at any point in the code and handle it gracefully.

add_to_history(file_download_state)
construct_active_downloads_summary(task_download_states)

For example:

#### ACTIVE DOWNLOADS #####

Job 08285 Task 004 - 80% (200/234MB)  - Thread-1
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/beauty/deep_lidar.deep.0005.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/data/deep_lidar.deep.0005.exr



Job 08285 Task 003 - 20%  (20/234MB) - Thread-2
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr
construct_file_downloads_history_summary(file_download_history)

For example:

#### DOWNLOAD HISTORY #####


6227709558521856 Job 08285 Task 001 20MB  CACHED /work/renders/light_v001/beauty/deep_lidar.deep.0005.exr  <timestamp>
7095580349853434 Job 08285 Task 002 10MB  DL     /work/renders/spider_fly01/beauty/deep_lidar.deep.0005.exr  <timestamp>
5343402947290140 Job 08284 Task 001 5MB   DL     /work/renders/light_v002/data/light_002.deep.0005.exr  <timestamp>
download_file(**kwargs)

For the given file information, download the file to disk. Check whether the file already exists and matches the expected md5 before downloading.

classmethod download_jobs(job_ids, task_id=None, thread_count=None, output_dir=None)

Run the downloader for explicit jobs, and terminate afterwards.

download_progess_polling = 2
download_target(**kwds)

This function is called in a new thread (and many threads may be executing this at a single time). This function is responsible for downloading a single Download (essentially an entity in Datastore which represents the output data from a single Task). This may consist of many files.

This function pulls one Download from the pending queue and attempts to download it. If it fails, it places it back on the queue.

The function also spawns a child thread that is responsible for constantly updating the app with the status of the Download, such as:

  • status (e.g. “pending”, “downloading”, “downloaded”)

  • bytes transferred (the total amount of bytes that have been transferred for the Download).

    Note that these bytes encompass all of the bytes that have been transferred for ALL of the files that are part of the Download (as opposed to only a single file)

Parameters:task_download_state

A class, serving as global mutable object, which allows this thread to report data about its Download state, so that other threads can read and output that data. This object is persistant for each thread, and is used over and over again, everytime time this function is called, for each Task that a thread downloads.

It’s important that the state information is wiped clean (reset) every time a new task begins. This is the responsibility of this function

endpoint_downloads_job = '/downloads/%s'
endpoint_downloads_next = '/downloads/next'
endpoint_downloads_status = '/downloads/status'
get_jobs_downloads(job_ids, task_id)
get_next_downloads(**kw)
history_queue_max = 100
md5_progess_polling = 2
nap()
naptime = 15
print_summary(thread_states, interval)

Print summary of download threads.

  • Total threads running
  • Thread names
  • Total download threads
  • Last 20 files downloaded
  • Last 20 tasks downloaded
  • Last 20 Downloads downloaded
  • Currently downloading jobs
  • Currently downloading files
  • Currently downloading Downloads

For example:

####### SUMMARY #########################
Active Thread Count: 14

Threads:
    ErrorThread
    MainThread
    MetricStore
    ProgressThread
    ProgressThread
    ProgressThread
    ProgressThread
    ProgressThread
    ProgressThread
    QueueThread
    Thread-1
    Thread-2
    Thread-3
    Thread-4
    Thread-5


#### ACTIVE DOWNLOADS #####

Job 08285 Task 004 - 80% (200/234MB)  - Thread-1
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/beauty/deep_lidar.deep.0005.exr  HASHING EXISTING FILE     80%
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/data/deep_lidar.deep.0005.exr    DOWLOADING 20%



Job 08285 Task 003 - 20%  (20/234MB) - Thread-2
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr


#### PENDING DOWNLOADS #####
    Job 08285 Task 006
    Job 08285 Task 007
    Job 08285 Task 008
    Job 08285 Task 009




#### HISTORY ####

Last 20 files downloaded:

    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01038.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01111.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01087.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01143.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01095.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01156.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01016.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01039.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01130.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01030.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01015.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01138.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01063.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01006.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01065.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01096.exr
    /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01055.exr

Last 20 tasks downloaded
    Job 08285 Task 004 (Download 3492394234)
    Job 08285 Task 002 (Download 3492394234)
    Job 08285 Task 003 (Download 3492394234)
    Job 08285 Task 001 (Download 3492394234)
    Job 08285 Task 000 (Download 3492394234)
    Job 08284 Task 065 (Download 3492394234)
    Job 08283 Task 064 (Download 3492394234)
    Job 08283 Task 063 (Download 3492394234)
    Job 08282 Task 032 (Download 3492394234)
    Job 08282 Task 025 (Download 3492394234)
    Job 08282 Task 001 (Download 3492394234)

####################
print_uptime()

Return the amount of time that the uploader has been running, e.g “0:01:28”

queue_target(pending_queue, downloading_queue)

Fill the download queue by quering the app for the “next” Download. Only fill the queue to have as many items as there are threads.

Perpetually run this this function until the daemon has been terminated

report_download_status(task_download_state)
reporter_target(task_download_state, downloader_thread)
start(job_ids=None, task_id=None, summary_interval=10)
classmethod start_daemon(thread_count=None, location=None, output_dir=None, summary_interval=10)

Run the downloader as a daemon

start_download_threads(downloading_queue, pending_queue)
start_queue_thread()

Start and return a thread that is responsible for pinging the app for Downloads to download (and populating the queue)

start_reporter_thread(download_data)
start_summary_thread(thread_states, interval)

Start and return a thread that is responsible for pinging the app for Downloads to download (and populating the queue)

start_time = None
class conductor.lib.downloader2.FileDownloadState

Bases: conductor.lib.downloader2.ThreadState

STATE_COMPLETE = 'COMPLETE'
STATE_DOWNLOADING = 'DOWNLOADING'
STATE_HASHING_EXISTING_FILE = 'HASHING EXISTING FILE'
STATE_PREPARING_DIRECTORY = 'PREPARING DIRECTORY'
STATE_QUEUED = 'QUEUED'
bytes_downloaded = 0
download_id = ''
file_info = None
filepath = None
get_duration()

Return the amount of time in seconds

hash_progress = 0
status = 'QUEUED'
thread_name = ''
time_completed = None
time_started = None
use_existing = False
class conductor.lib.downloader2.HistoryTableStr(data, column_names, title='', footer='', upper_headers=True)

Bases: conductor.lib.loggeria.TableStr

human_bytes(bytes_, ljust=0, rjust=0)
human_time(timestamp, ljust=0, rjust=0)
class conductor.lib.downloader2.TaskDownloadState

Bases: conductor.lib.downloader2.ThreadState

ENTITY_STATES = {'COMPLETE': 'downloaded', 'DOWNLOADING': 'downloading', 'ERROR': 'pending', 'HASHING EXISTING FILE': 'downloading', 'PREPARING DIRECTORY': 'downloading', 'QUEUED': 'downloading'}
STATE_COMPLETE = 'COMPLETE'
STATE_DOWNLOADING = 'DOWNLOADING'
STATE_ERROR = 'ERROR'
STATE_HASHING_EXISTING_FILE = 'HASHING EXISTING FILE'
STATE_PREPARING_DIRECTORY = 'PREPARING DIRECTORY'
STATE_QUEUED = 'QUEUED'
get_bytes_downloaded()
get_duration()

Return the amount of time in seconds

get_entity_status()

Return corresponding entity status for the TaskDownloadState’s current status

initialize(task_download)

Reset/initialize the properties

reset()
reset_bytes_downloaded()
class conductor.lib.downloader2.ThreadState

Bases: object

Use a class as mutable datatype so that it can be used across threads

conductor.lib.downloader2.chmod(*args, **kwargs)
conductor.lib.downloader2.dec_random_exception(percentage_chance)

DECORATOR for creating random exceptions for the wrapped function. This is used for simulating errors to test downloader recovery behavior/robustness

conductor.lib.downloader2.delete_file(*args, **kwargs)
conductor.lib.downloader2.download_file(*args, **kwargs)
Parameters:state (class) – A class with .bytes_downloaded property. Reflects the amount of bytes that have currently been downloaded. This can be used by other threads to report “progress”. Note that this must be a mutable object (hence a class), so that this function, as well as other threads will read/write to the same object.
conductor.lib.downloader2.prepare_dest_dirpath(dir_path)

Prepare the destination directory, ensureing that it exists and it has open permission.

conductor.lib.downloader2.random_exeption(percentage_chance)
conductor.lib.downloader2.report_error(self, download_id, error_message)
conductor.lib.downloader2.run_downloader(args)

Start the downloader. If a job id(s) were given, exit the downloader upon completion. Otherwise, run the downloader indefinitely (daemon mode), polling the Conductor cloud app for files that need to be downloaded.

conductor.lib.downloader2.safe_mkdirs(dirpath)

Create the given directory. If it already exists, suppress the exception. This function is useful when handling concurrency issues where it’s not possible to reliably check whether a directory exists before creating it.

conductor.lib.downloader2.set_logging(level=None, log_dirpath=None)