conductor.lib.downloader module

Command Line Process to run downloads.

class conductor.lib.downloader.Backend
classmethod bearer_token(**kw)
classmethod fail(**kw)
classmethod finish(**kw)
classmethod get(**kwargs)

Return a list of items

classmethod headers()
static make_url(path)

TODO: get rid of this hardcoding!!!

classmethod next(**kw)

Return the next download (dict), or None if there isn’t one.

classmethod post(**kwargs)
classmethod put(**kwargs)
classmethod touch(**kw)
class conductor.lib.downloader.Counter(value=0)

Bases: object

Acts as a mutable integer variable, so that adding/subtracting can be done to the same object (retaining pointers to same object/varaiable in memory)

Use the .value attribute to read/write values from/to

value
class conductor.lib.downloader.DecAuthorize

Bases: object

Decorator that adds an authentication header to the wrapped function’s “headers” argument. Automatically renews tokens when encountering 401 errors and retries the function

class conductor.lib.downloader.DecDownloaderRetry(run_value, retry_exceptions=<type 'exceptions.Exception'>, skip_exceptions=(), tries=8, static_sleep=None)

Bases: conductor.lib.common.DecRetry

Some Docs.

sleep(seconds)

sleep for the given number of seconds.

Instead of doing one long sleep call, we make a loop of many short sleep calls. This gives the opportunity to check the running state, and exit the sleep process if necessary.

class conductor.lib.downloader.DownloadWorker(run_state, result_queue, account, log_interval=5, output_dir=None, project=None, location=None)

Bases: multiprocessing.process.Process

A multiprocessing.Process worker class that is responsible for:

  • Ask the downloader service for the next download in the queue.
  • stream the downloadable file to disk (or check whether it already exists).
  • periodically report progress to the service.
  • record the results of each file download and log out history summaries
  • notify service of file completion and failure (when possible)

The parent/calling process uses a process-safe multiprocessing.Array object to communicate to this process when needing to shutdown/cleanup/exit.

Available state values:

  • “running”: All systems go.
  • “stopping”: Stop everything and cleanup.

The DownloadWorker process has a child thread of it’s own (TouchWorker), which is responsible for communicating the progress of the current file download to the conductor backend. It, too, uses the a multiprocessing.Array object to control signal when it should be stopped or not.

download(**kwargs)

The “outer” download function that wraps the “real” download function in retries (decorator) and md5 verification. All exceptions that are encountered will be automatically retried…except for DownloaderExit

run()

Overloaded method that is called by the inherited start() method.

This serves as the outer/upper wrapper function which is responsible for:

  1. Creating and starting any child workers of its own (TouchWorker, etc).
  2. Calling the “inner” run function to continually query for and download files that are pending download.
  3. Monitoring the run_state value to determine whether to exit the loop or not.
  4. Catching all unexpected exceptions and preventing the process from dying
  5. Catching any DownloaderExit exceptions to break out of the while loop
  6. Calling the _stop method to properly stop any/all child processes/threads.

In order to shutdown the Downloader in general, it’s important that this method is exited properly (reaches the end of the method). i.e. don’t allow any exceptions to be raised.

class conductor.lib.downloader.Downloader(args)

Bases: object

Top-level Downloader class that maintains all worker processes/threads and terminates them using shared state objects (signaled via SIGINT -KeyboardInterrupt).

Below is the structure of the Downloader and all of it’s child process/threads:

Downloader                    # Main process
    |
    |_DownloadWorker-1        # Process
    |    |_TouchWorker-1      # Thread
    |
    |_DownloadWorker-2        # Process
    |    |_TouchWorker-2      # Thread
    |
    |_DownloadWorker-N        # Process
    |    |_TouchWorker-N      # Thread
    |
    |_HistoryWorker           # Process

Notes

For the sake of the brevity we’ll refer to “threads” as “processes” for the rest of this documentation

In order to shutdown these child processes, each process (including their own child processes has a corresponding multiprocessing.Array object that the parent uses to signal the child to shutdown. Though processes can be terminated by having each one simply throwing an exception, we want to make sure that each process terminates cleanly (releasing resources and signaling to their own child processes). Therefore, every child process should only exit by returning from it’s “run” method (don’t raise an exception!).

Also, every parent should “join” any of it’s child processes before exiting itself. This ensures that all children processes have indeed terminated properly. Otherwise this could lead to “zombie” processes (no parents).

Life-cycle of a Process:

  1. A process (Process1) is started by parent process (Parent1). Parent1 retains a shared object to signal run-state change to Process1
  2. Process1 creates and starts it’s own child process (Child1)
  3. Process1 runs indefinitely (looping within it’s “run” method), constantly checking the status of it’s run-state object.
  4. A KeyboardInterupt (SIGINT) is triggered by the user, and is handled by Parent1. Parent1 signals to Process1 to shutdown (by changing the value of the run-state object).
  5. Process1 detects that the run-state value has changed, and calls it’s own “_stop” method.
  6. The stop method changes the run-state object for the Child1 process, (triggering Child1 to go through the same shutdown logic (step 5)
  7. Before returning from the “run” method, Process1 “joins” the Child1 process so that it blocks (and waits for Child1 to exit).

The parent/calling process uses a process-safe multiprocessing.Array object to communicate to this process when needing to shutdown/cleanup/exit.

Available state values:

  • “running”: All systems go.
  • “stopping”: Stop everything and cleanup.

The DownloadWorker process has a child thread of it’s own (TouchWorker), which is responsible for communicating the progress of the current file download to the conductor backend. It, too, uses the a multiprocessing.Array object to control signal when it should be stopped or not.

RESULTS_MAX = 256
STATE_RUNNING = 'running'
STATE_STOPPING = 'stopping'
log_uptime()

Return the amount of time that the uploader has been running, e.g “0:01:28”

sigint_handler(sig, frm)

Handles the SIGINT signal (i.e. the KeyboardInterrupt python exception).

This simply calls the Downloader’s stop_daemon method.

Note that when registering this function as the signint handler, this function gets called by ALL child processes as well. Because of this, we check which process is calling it, and only execute it if it’s the main (parent) process.

start_daemon()

Start the downloader daemon. Create all child worker processes, start them, and join them. This is blocking function and will not exit until all workers have exited their prococess.

stop_daemon()

Stop the downloader daemon and exit the application.

  1. change the run_state value of each worker’s shared object value. This will trigger each worker’s stop/cleanup behaviors.
  2. Exit the application runtime by raising an exception (we must break out of the join that occurred in the start_daemon method).
class conductor.lib.downloader.HistoryWorker(run_state, results_queue, print_interval=10, history_max=100, worker_type='download', column_names=None)

Bases: multiprocessing.process.Process

A process that periodically print/logs the “history” of the last x files that have been downloaded/uploaded.

run()

Start and return a thread that is responsible for pinging the app for files to transfer (and populating the queue)

class conductor.lib.downloader.TouchWorker(run_state, progress_queue, interval=10, process_name='', account=None, project=None, location=None)

Bases: threading.Thread

A Thread that periodically reports/”touches” file progress to the Conductor backend. Aside from reporting the byte progress of a file, it also serves as a “heartbeat” for a file download, indicating to the backend that the file is still in progress and to not consider it a stranded/dead file download.

The frequency of the reporting is dictated by two things:

  1. The self._interval variable (seconds). A potential “touch” will occur every _interval seconds
  2. However, the touch will only be executed if there is data in the progress_queue. If there is no data in the progress queue then a “touch” will not not be issued. The progress queue should have constant data streaming through it (either when md5 hashing or downloading the file). If there is no data getting put into it, then it means that the DownloadWorker has hung on that file for some reason. Eventually the backend will “reset” that download because it will not have been touched after a few minutes.
run()

Perpetually report the file progress to the app for a given interval.

Report on the last entry found in the progress queue. If there are no entries, then do not report anything. We want the backend to reset any “stranded”/”stuck” downloads, so only report progress when there IS progress. Progress entries only get added to the queue when a file download is being actively hashed or downloaded.

conductor.lib.downloader.empty_queue(queue)

Remove and return all items from the given Queue object

conductor.lib.downloader.get_bearer_token(refresh=False)

Return the bearer token from a cached(global) variable.. If there is no cached value, then fetch a new bearer token and return it (and cache it).

Note that that BEARER_TOKEN is not a simple string. It’s a process/thread-safe object

conductor.lib.downloader.make_auth_header(bearer_token)

Create and return a dictionary which contains the authorization token info

conductor.lib.downloader.run_downloader(args)

Run the downloader with the given arguments.

conductor.lib.downloader.safe_mkdirs(dirpath)

Create the given directory. If it already exists, suppress the exception. This function is useful when handling concurrency issues where it’s not possible to reliably check whether a directory exists before creating it.

conductor.lib.downloader.set_logging(level=None, log_dirpath=None)