conductor.lib.uploader module¶
-
class
conductor.lib.uploader.
FileStatWorker
(*args, **kwargs)¶ Bases:
conductor.lib.worker.ThreadWorker
This worker subscribes to a queue of (path,signed_upload_url) pairs.
For each item on the queue, it determines the size (in bytes) of the files to be uploaded, and aggregates the total size for all uploads.
It then places the triplet (filepath, upload_url, byte_size) onto the out_queue
The bytes_to_upload arg is used to hold the aggregated size of all files that need to be uploaded.
Note
This is stored as an
int
in order to pass it by reference, as it needs to be accessed and reset by the caller.-
do_work
(job, thread_int)¶ Parameters: job (dict[str, str]) – filepath, signed_upload_url pairs. The FileStatWorker iterates through the dict. For each item, it aggregates the filesize in bytes, and passes each pair as a tuple to the UploadWorker queue.
-
-
class
conductor.lib.uploader.
HttpBatchWorker
(*args, **kwargs)¶ Bases:
conductor.lib.worker.ThreadWorker
This worker receives a batched dict of (filename: md5) pairs and makes a batched http api call which returns a list of (filename: signed_upload_url) of files that need to be uploaded.
Each item in the return list is added to the out_queue.
-
do_work
(job, thread_int)¶ This needs to be implemented for each worker type. The work task from the in_queue is passed as the job argument.
Returns the result to be passed to the out_queue
-
make_request
(job)¶
-
-
class
conductor.lib.uploader.
MD5OutputWorker
(*args, **kwargs)¶ Bases:
conductor.lib.worker.ThreadWorker
This worker will batch the computed md5’s into
self.batch_size
chunks. It will send a partial batch after waitingself.wait_time
seconds-
check_for_poison_pill
(job)¶ we need to make sure we ship the last batch before we terminate
-
ship_batch
()¶
-
target
(thread_int)¶
-
-
class
conductor.lib.uploader.
MD5Worker
(*args, **kwargs)¶ Bases:
conductor.lib.worker.ThreadWorker
This worker will pull filenames from in_queue and compute it’s base64 encoded md5, which will be added to out_queue
-
cache_file_info
(file_info)¶ Store the given file_info into the database
-
do_work
(job, thread_int)¶ This needs to be implemented for each worker type. The work task from the in_queue is passed as the job argument.
Returns the result to be passed to the out_queue
-
get_md5
(filepath)¶ For the given filepath, return it’s md5.
Use the sqlite db cache to retrive this (if the cache is valid), otherwise generate the md5 from scratch
-
-
class
conductor.lib.uploader.
UploadWorker
(*args, **kwargs)¶ Bases:
conductor.lib.worker.ThreadWorker
This worker receives a (filepath: signed_upload_url) pair and performs an upload of the specified file to the provided url.
-
chunked_reader
(filename)¶
-
do_upload
(**kwargs)¶ Note that we don’t rely on the make_request’s own retry mechanism because we need to recreate the chunked_reader generator before retrying the request. Instead, we wrap this method in a retry decorator.
-
do_work
(job, thread_int)¶ This needs to be implemented for each worker type. The work task from the in_queue is passed as the job argument.
Returns the result to be passed to the out_queue
-
-
class
conductor.lib.uploader.
Uploader
(args=None)¶ Bases:
object
-
static
convert_byte_count_to_string
(byte_count, transfer_rate=False)¶ Converts a byte count to a string denoting its size in GB/MB/KB
-
static
convert_time_to_string
(time_remaining)¶
-
create_manager
(project, md5_only=False)¶
-
create_print_status_thread
()¶
-
create_report_status_thread
()¶
-
static
estimated_time_remaining
(elapsed_time, percent_complete)¶ This method estimates the time that is remaining, given the elapsed time and percent complete.
It uses the following formula:
let; t0 = elapsed time P = percent complete (0 <= n <= 1) time_remaining = (t0 - t0 * P) / P
Which is derived from:
percent_complete = elapsed_time / (elapsed_time + time_remaining)
-
handle_upload_response
(project, upload_files, upload_id=None, md5_only=False)¶ This is a reallly confusing method and should probably be split into to clear logic branches: one that is called when in daemon mode, and one that is not. If not called in daemon mode (local_upload=True), then md5_only is True and project is not None.Otherwise we’re in daemon mode, where the project information is not required because the daemon will only be fed uploads by the app which have valid projects attached to them.
-
main
(run_one_loop=False)¶
-
mark_upload_failed
(error_message, upload_id)¶
-
mark_upload_finished
(upload_id, upload_files)¶
-
prepare_workers
()¶
-
print_status
()¶
-
report_status
()¶
-
return_md5s
()¶ Return a dictionary of the filepaths and their md5s that were generated upon uploading
-
sleep_time
= 10¶
-
upload_status_text
()¶
-
static
-
conductor.lib.uploader.
get_file_info
(filepath)¶ For the given filepath return the following information in a dictionary with:
- “filepath”: filepath (str)
- “modtime”: modification time (datetime.datetime)
- “size”: filesize in bytes (int)
-
conductor.lib.uploader.
resolve_arg
(arg_name, args, config)¶ Helper function to resolve the value of an argument.
The order of resolution is:
- Check whether the user explicitly specified the argument when calling/ instantiating the class. If so, then use it, otherwise…
- Attempt to read it from the
config.yml
. Note that the config also queries environment variables to populate itself with values. If the value is in the config then use it, otherwise… - return
None
.
-
conductor.lib.uploader.
resolve_args
(args)¶ Resolve all arguments, reconsiling differences between command line args and config.yml args. See resolve_arg function.
-
conductor.lib.uploader.
run_uploader
(args)¶ Start the uploader process. This process will run indefinitely, polling the Conductor cloud app for files that need to be uploaded.
-
conductor.lib.uploader.
set_logging
(level=None, log_dirpath=None)¶