conductor.lib.uploader_v2 module

Uploader daemon v2

class conductor.lib.uploader_v2.Backend

Interface to backend (FileIO service)

classmethod bearer_token(**kw)
classmethod fail(upload, location=None, bytes_downloaded=0)

Tell backend about upload failure

classmethod fail_unsigned(upload, location=None)

Tell backend about upload failure

classmethod finish(upload, location=None, bytes_downloaded=0)

Tell backend about upload success

classmethod get(**kwargs)

Return a list of items

classmethod headers()
static make_url(path)
classmethod next(account, project=None, location=None, number=1)

Return the next download (dict), or None if there isn’t one.

classmethod post(**kwargs)

Call requests put

classmethod put(**kwargs)

Call requests put

classmethod put_file(filegen, signed_url)

Upload a file (streaming) into GCS

classmethod sign(upload, location=None)

Sign an upload payload

classmethod touch(upload, location=None, bytes_downloaded=0)

Update backend with upload status

class conductor.lib.uploader_v2.FileGenerator(upload_file, chunk_size=1048576, event_handler=None)

Bases: object

Since requests.put() can take a generator as the data param in order to allow for streaming uploads, this class can be used to create a generator from a filepath. Optionally, it can take a function/callable class instance as an event handler. The handler will be called when each chunk of the file is read.

next()

Generator implementation

progress_event()

Fire callback

stop_event()

Fire callback

class conductor.lib.uploader_v2.Uploader(args)

Bases: object

Main Uploader process.

  • Launch workers
  • keep run states etc.
RESULTS_MAX = 256
STATE_RUNNING = 'running'
STATE_STOPPING = 'stopping'
create_log_history()
log_uptime()

Return the amount of time that the uploader has been running, e.g “0:01:28”

sigint_handler(sig, frm)

Handles the SIGINT signal (i.e. the KeyboardInterrupt python exception).

This simply calls the Downloader’s stop_daemon method.

Note that when registering this function as the signint handler, this function gets called by ALL child processes as well. Because of this, we check which process is calling it, and only execute it if it’s the main (parent) process.

start_daemon()

Start the downloader daemon. Create all child worker processes, start them, and join them. This is blocking function and will not exit until all workers have exited their process.

stop_daemon()

Stop the downloader daemon and exit the application.

  1. change the run_state value of each worker’s shared object value. This will trigger each worker’s stop/cleanup behaviors.
  2. Exit the application runtime by raising an exception (we must break out of the join that occurred in the start_daemon method).
class conductor.lib.uploader_v2.UploaderWorker(run_state, results_queue, account=None, location=None, project=None)

Bases: multiprocessing.process.Process

Worker process for uploads.

It’s job is to periodically ask the server for an upload object:

{u'account': u'testaccountdomain',
 u'bytes_transferred': 0,
 u'filepath': u'/home/testers/Nuke/Nuke/nuke_test/img/sword_gas-l.png',
 u'filesize': 1234,
 u'gcs_url': u'https://storage.googleapis.com/..snip..",
 u'id': u'bd55f770c8e0566e272243d7555cf506',
 u'jid': None,
 u'location': None,
 u'md5': u'vVX3cMjgVm4nIkPXVVz1Bg==',
 u'priority': 1,
 u'status': None,
 u'total_size': 12934,
 u'ulid': u'5273391011463168'}

It will then create an FileGenerator() object from the filepath, registering the self.event_handler() method as the callback to be fired for each chunk uploaded to GCS.

It knows how to handle success and failure events from the upload stream.

On success, will will clean up any state and go back to asking for the next upload.

file_md5(filepath)

make an md5 sum of a file.

handle_finish(result)

Callback for finish/success

handle_potential_upload()

Process an upload json object

handle_put_error(err, fileobj)

Callback for upload error

handle_put_progress(filegen)

Callback for upload progress

handle_put_success(filegen)

Callback for finish/success

handle_upload_event(filegen, event)

Callback for uploads

log(message, level=10, fields=None)

Create a formatted log message from passed in data, automatically add process info, and call LOGGER with result.

maybe_touch()
maybe_upload()

Upload a file if md5 matches expectation.

md5_for_current_upload()
next_upload()

Get the next upload

put_upload()

Put the upload into GCS

reset()

Reset state

run()

Method to be run in sub-process; can be overridden in sub-class

stop()

Call at the end of shutdown

touch()
wait()

sleep for WORKER_SLEEP_DURATION

Instead of doing one long sleep call, we make a loop of many short sleep calls. This gives the opportunity to check the running state, and exit the sleep process if necessary.

conductor.lib.uploader_v2.resolve_arg(arg_name, args, config)

Helper function to resolve the value of an argument.

The order of resolution is:

  1. Check whether the user explicitly specified the argument when calling/ instantiating the class. If so, then use it, otherwise…
  2. Attempt to read it from the config.yml. Note that the config also queries environment variables to populate itself with values. If the value is in the config then use it, otherwise…
  3. return None
conductor.lib.uploader_v2.resolve_args(args)

Resolve all arguments, reconsiling differences between command line args and config.yml args. See resolve_arg function.

conductor.lib.uploader_v2.run_uploader(args)

Start the uploader process. This process will run indefinitely, polling the Conductor cloud app for files that need to be uploaded.

conductor.lib.uploader_v2.set_logging(level=None, log_dirpath=None)

Set logging level and path.