conductor.lib.downloader module¶
Command Line Process to run downloads.
-
class
conductor.lib.downloader.
Backend
¶ -
classmethod
bearer_token
(**kw)¶
-
classmethod
fail
(**kw)¶
-
classmethod
finish
(**kw)¶
-
classmethod
get
(**kwargs)¶ Return a list of items
-
classmethod
headers
()¶
-
static
make_url
(path)¶ TODO: get rid of this hardcoding!!!
-
classmethod
next
(**kw)¶ Return the next download (dict), or None if there isn’t one.
-
classmethod
post
(**kwargs)¶
-
classmethod
put
(**kwargs)¶
-
classmethod
touch
(**kw)¶
-
classmethod
-
class
conductor.lib.downloader.
Counter
(value=0)¶ Bases:
object
Acts as a mutable integer variable, so that adding/subtracting can be done to the same object (retaining pointers to same object/varaiable in memory)
Use the .value attribute to read/write values from/to
-
value
¶
-
-
class
conductor.lib.downloader.
DecAuthorize
¶ Bases:
object
Decorator that adds an authentication header to the wrapped function’s “headers” argument. Automatically renews tokens when encountering 401 errors and retries the function
-
class
conductor.lib.downloader.
DecDownloaderRetry
(run_value, retry_exceptions=<type 'exceptions.Exception'>, skip_exceptions=(), tries=8, static_sleep=None)¶ Bases:
conductor.lib.common.DecRetry
Some Docs.
-
sleep
(seconds)¶ sleep for the given number of seconds.
Instead of doing one long sleep call, we make a loop of many short sleep calls. This gives the opportunity to check the running state, and exit the sleep process if necessary.
-
-
class
conductor.lib.downloader.
DownloadWorker
(run_state, result_queue, account, log_interval=5, output_dir=None, project=None, location=None)¶ Bases:
multiprocessing.process.Process
A multiprocessing.Process worker class that is responsible for:
- Ask the downloader service for the next download in the queue.
- stream the downloadable file to disk (or check whether it already exists).
- periodically report progress to the service.
- record the results of each file download and log out history summaries
- notify service of file completion and failure (when possible)
The parent/calling process uses a process-safe multiprocessing.Array object to communicate to this process when needing to shutdown/cleanup/exit.
Available state values:
- “running”: All systems go.
- “stopping”: Stop everything and cleanup.
The DownloadWorker process has a child thread of it’s own (TouchWorker), which is responsible for communicating the progress of the current file download to the conductor backend. It, too, uses the a
multiprocessing.Array
object to control signal when it should be stopped or not.-
download
(**kwargs)¶ The “outer” download function that wraps the “real” download function in retries (decorator) and md5 verification. All exceptions that are encountered will be automatically retried…except for DownloaderExit
-
run
()¶ Overloaded method that is called by the inherited start() method.
This serves as the outer/upper wrapper function which is responsible for:
- Creating and starting any child workers of its own
(
TouchWorker
, etc). - Calling the “inner” run function to continually query for and download files that are pending download.
- Monitoring the
run_state
value to determine whether to exit the loop or not. - Catching all unexpected exceptions and preventing the process from dying
- Catching any
DownloaderExit
exceptions to break out of the while loop - Calling the
_stop
method to properly stop any/all child processes/threads.
In order to shutdown the Downloader in general, it’s important that this method is exited properly (reaches the end of the method). i.e. don’t allow any exceptions to be raised.
- Creating and starting any child workers of its own
(
-
class
conductor.lib.downloader.
Downloader
(args)¶ Bases:
object
Top-level Downloader class that maintains all worker processes/threads and terminates them using shared state objects (signaled via SIGINT -KeyboardInterrupt).
Below is the structure of the Downloader and all of it’s child process/threads:
Downloader # Main process | |_DownloadWorker-1 # Process | |_TouchWorker-1 # Thread | |_DownloadWorker-2 # Process | |_TouchWorker-2 # Thread | |_DownloadWorker-N # Process | |_TouchWorker-N # Thread | |_HistoryWorker # Process
Notes
For the sake of the brevity we’ll refer to “threads” as “processes” for the rest of this documentation
In order to shutdown these child processes, each process (including their own child processes has a corresponding multiprocessing.Array object that the parent uses to signal the child to shutdown. Though processes can be terminated by having each one simply throwing an exception, we want to make sure that each process terminates cleanly (releasing resources and signaling to their own child processes). Therefore, every child process should only exit by returning from it’s “run” method (don’t raise an exception!).
Also, every parent should “join” any of it’s child processes before exiting itself. This ensures that all children processes have indeed terminated properly. Otherwise this could lead to “zombie” processes (no parents).
Life-cycle of a Process:
- A process (
Process1
) is started by parent process (Parent1
).Parent1
retains a shared object to signal run-state change toProcess1
Process1
creates and starts it’s own child process (Child1
)Process1
runs indefinitely (looping within it’s “run” method), constantly checking the status of it’s run-state object.- A
KeyboardInterupt
(SIGINT) is triggered by the user, and is handled byParent1
.Parent1
signals toProcess1
to shutdown (by changing the value of the run-state object). Process1
detects that the run-state value has changed, and calls it’s own “_stop” method.- The stop method changes the run-state object for the
Child1
process, (triggeringChild1
to go through the same shutdown logic (step 5) - Before returning from the “run” method,
Process1
“joins” theChild1
process so that it blocks (and waits forChild1
to exit).
The parent/calling process uses a process-safe
multiprocessing.Array
object to communicate to this process when needing to shutdown/cleanup/exit.Available state values:
- “running”: All systems go.
- “stopping”: Stop everything and cleanup.
The DownloadWorker process has a child thread of it’s own (TouchWorker), which is responsible for communicating the progress of the current file download to the conductor backend. It, too, uses the a
multiprocessing.Array
object to control signal when it should be stopped or not.-
RESULTS_MAX
= 256¶
-
STATE_RUNNING
= 'running'¶
-
STATE_STOPPING
= 'stopping'¶
-
log_uptime
()¶ Return the amount of time that the uploader has been running, e.g “0:01:28”
-
sigint_handler
(sig, frm)¶ Handles the SIGINT signal (i.e. the KeyboardInterrupt python exception).
This simply calls the Downloader’s stop_daemon method.
Note that when registering this function as the signint handler, this function gets called by ALL child processes as well. Because of this, we check which process is calling it, and only execute it if it’s the main (parent) process.
-
start_daemon
()¶ Start the downloader daemon. Create all child worker processes, start them, and join them. This is blocking function and will not exit until all workers have exited their prococess.
-
stop_daemon
()¶ Stop the downloader daemon and exit the application.
- change the run_state value of each worker’s shared object value. This will trigger each worker’s stop/cleanup behaviors.
- Exit the application runtime by raising an exception (we must break out of the join that occurred in the start_daemon method).
- A process (
-
class
conductor.lib.downloader.
HistoryWorker
(run_state, results_queue, print_interval=10, history_max=100, worker_type='download', column_names=None)¶ Bases:
multiprocessing.process.Process
A process that periodically print/logs the “history” of the last x files that have been downloaded/uploaded.
-
run
()¶ Start and return a thread that is responsible for pinging the app for files to transfer (and populating the queue)
-
-
class
conductor.lib.downloader.
TouchWorker
(run_state, progress_queue, interval=10, process_name='', account=None, project=None, location=None)¶ Bases:
threading.Thread
A Thread that periodically reports/”touches” file progress to the Conductor backend. Aside from reporting the byte progress of a file, it also serves as a “heartbeat” for a file download, indicating to the backend that the file is still in progress and to not consider it a stranded/dead file download.
The frequency of the reporting is dictated by two things:
- The self._interval variable (seconds). A potential “touch” will occur every _interval seconds
- However, the touch will only be executed if there is data in the progress_queue. If there is no data in the progress queue then a “touch” will not not be issued. The progress queue should have constant data streaming through it (either when md5 hashing or downloading the file). If there is no data getting put into it, then it means that the DownloadWorker has hung on that file for some reason. Eventually the backend will “reset” that download because it will not have been touched after a few minutes.
-
run
()¶ Perpetually report the file progress to the app for a given interval.
Report on the last entry found in the progress queue. If there are no entries, then do not report anything. We want the backend to reset any “stranded”/”stuck” downloads, so only report progress when there IS progress. Progress entries only get added to the queue when a file download is being actively hashed or downloaded.
-
conductor.lib.downloader.
empty_queue
(queue)¶ Remove and return all items from the given Queue object
-
conductor.lib.downloader.
get_bearer_token
(refresh=False)¶ Return the bearer token from a cached(global) variable.. If there is no cached value, then fetch a new bearer token and return it (and cache it).
Note that that BEARER_TOKEN is not a simple string. It’s a process/thread-safe object
-
conductor.lib.downloader.
make_auth_header
(bearer_token)¶ Create and return a dictionary which contains the authorization token info
-
conductor.lib.downloader.
run_downloader
(args)¶ Run the downloader with the given arguments.
-
conductor.lib.downloader.
safe_mkdirs
(dirpath)¶ Create the given directory. If it already exists, suppress the exception. This function is useful when handling concurrency issues where it’s not possible to reliably check whether a directory exists before creating it.
-
conductor.lib.downloader.
set_logging
(level=None, log_dirpath=None)¶