conductor.lib.uploader_v2 module¶
Uploader daemon v2
-
class
conductor.lib.uploader_v2.
Backend
¶ Interface to backend (FileIO service)
-
classmethod
bearer_token
(**kw)¶
-
classmethod
fail
(upload, location=None, bytes_downloaded=0)¶ Tell backend about upload failure
-
classmethod
fail_unsigned
(upload, location=None)¶ Tell backend about upload failure
-
classmethod
finish
(upload, location=None, bytes_downloaded=0)¶ Tell backend about upload success
-
classmethod
get
(**kwargs)¶ Return a list of items
-
classmethod
headers
()¶
-
static
make_url
(path)¶
-
classmethod
next
(account, project=None, location=None, number=1)¶ Return the next download (dict), or None if there isn’t one.
-
classmethod
post
(**kwargs)¶ Call requests put
-
classmethod
put
(**kwargs)¶ Call requests put
-
classmethod
put_file
(filegen, signed_url)¶ Upload a file (streaming) into GCS
-
classmethod
sign
(upload, location=None)¶ Sign an upload payload
-
classmethod
touch
(upload, location=None, bytes_downloaded=0)¶ Update backend with upload status
-
classmethod
-
class
conductor.lib.uploader_v2.
FileGenerator
(upload_file, chunk_size=1048576, event_handler=None)¶ Bases:
object
Since requests.put() can take a generator as the data param in order to allow for streaming uploads, this class can be used to create a generator from a filepath. Optionally, it can take a function/callable class instance as an event handler. The handler will be called when each chunk of the file is read.
-
next
()¶ Generator implementation
-
progress_event
()¶ Fire callback
-
stop_event
()¶ Fire callback
-
-
class
conductor.lib.uploader_v2.
Uploader
(args)¶ Bases:
object
Main Uploader process.
- Launch workers
- keep run states etc.
-
RESULTS_MAX
= 256¶
-
STATE_RUNNING
= 'running'¶
-
STATE_STOPPING
= 'stopping'¶
-
create_log_history
()¶
-
log_uptime
()¶ Return the amount of time that the uploader has been running, e.g “0:01:28”
-
sigint_handler
(sig, frm)¶ Handles the SIGINT signal (i.e. the KeyboardInterrupt python exception).
This simply calls the Downloader’s stop_daemon method.
Note that when registering this function as the signint handler, this function gets called by ALL child processes as well. Because of this, we check which process is calling it, and only execute it if it’s the main (parent) process.
-
start_daemon
()¶ Start the downloader daemon. Create all child worker processes, start them, and join them. This is blocking function and will not exit until all workers have exited their process.
-
stop_daemon
()¶ Stop the downloader daemon and exit the application.
- change the run_state value of each worker’s shared object value. This will trigger each worker’s stop/cleanup behaviors.
- Exit the application runtime by raising an exception (we must break out of the join that occurred in the start_daemon method).
-
class
conductor.lib.uploader_v2.
UploaderWorker
(run_state, results_queue, account=None, location=None, project=None)¶ Bases:
multiprocessing.process.Process
Worker process for uploads.
It’s job is to periodically ask the server for an upload object:
{u'account': u'testaccountdomain', u'bytes_transferred': 0, u'filepath': u'/home/testers/Nuke/Nuke/nuke_test/img/sword_gas-l.png', u'filesize': 1234, u'gcs_url': u'https://storage.googleapis.com/..snip..", u'id': u'bd55f770c8e0566e272243d7555cf506', u'jid': None, u'location': None, u'md5': u'vVX3cMjgVm4nIkPXVVz1Bg==', u'priority': 1, u'status': None, u'total_size': 12934, u'ulid': u'5273391011463168'}
It will then create an
FileGenerator()
object from the filepath, registering theself.event_handler()
method as the callback to be fired for each chunk uploaded to GCS.It knows how to handle success and failure events from the upload stream.
On success, will will clean up any state and go back to asking for the next upload.
-
file_md5
(filepath)¶ make an md5 sum of a file.
-
handle_finish
(result)¶ Callback for finish/success
-
handle_potential_upload
()¶ Process an upload json object
-
handle_put_error
(err, fileobj)¶ Callback for upload error
-
handle_put_progress
(filegen)¶ Callback for upload progress
-
handle_put_success
(filegen)¶ Callback for finish/success
-
handle_upload_event
(filegen, event)¶ Callback for uploads
-
log
(message, level=10, fields=None)¶ Create a formatted log message from passed in data, automatically add process info, and call LOGGER with result.
-
maybe_touch
()¶
-
maybe_upload
()¶ Upload a file if md5 matches expectation.
-
md5_for_current_upload
()¶
-
next_upload
()¶ Get the next upload
-
put_upload
()¶ Put the upload into GCS
-
reset
()¶ Reset state
-
run
()¶ Method to be run in sub-process; can be overridden in sub-class
-
stop
()¶ Call at the end of shutdown
-
touch
()¶
-
wait
()¶ sleep for
WORKER_SLEEP_DURATION
Instead of doing one long sleep call, we make a loop of many short sleep calls. This gives the opportunity to check the running state, and exit the sleep process if necessary.
-
-
conductor.lib.uploader_v2.
resolve_arg
(arg_name, args, config)¶ Helper function to resolve the value of an argument.
The order of resolution is:
- Check whether the user explicitly specified the argument when calling/ instantiating the class. If so, then use it, otherwise…
- Attempt to read it from the
config.yml
. Note that the config also queries environment variables to populate itself with values. If the value is in the config then use it, otherwise… - return
None
-
conductor.lib.uploader_v2.
resolve_args
(args)¶ Resolve all arguments, reconsiling differences between command line args and config.yml args. See resolve_arg function.
-
conductor.lib.uploader_v2.
run_uploader
(args)¶ Start the uploader process. This process will run indefinitely, polling the Conductor cloud app for files that need to be uploaded.
-
conductor.lib.uploader_v2.
set_logging
(level=None, log_dirpath=None)¶ Set logging level and path.