API
A library for splitting python workflows into separate tasks
advice
🔗
Contains classes and functions for advising tasks.
Advice
🔗
Interface that needs to be implemented by an advice.
The interface is quite simple. One has to implement two different methods,
before(session)
and after(session)
, that are called during the process of
advising a task execution. Both take a single argument session
which contains an
instance of the class Session
. This object allows the individual advices to
influence the task execution by changing the way the task is being called or
making changes to the result.
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advice.py
def after(self, session): # pylint: disable=R0201 # no-self-use
"""
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the `session`.
In order to continue, the advice MUST either call `session.proceed()`, which
will continue the process with current `result` and the next advice in the
advice chain, or call `session.conclude(result)` with a `Result` instance,
which will set a different result and continue with it.
The default implementation will just call `session.proceed()`.
Args:
session (bandsaw.session.Session): The session of the execution.
"""
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advice.py
def before(self, session): # pylint: disable=R0201 # no-self-use
"""
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call `session.proceed()`, which will continue the process with the next
advice in the advice chain, or call `session.conclude(result)` with a `Result`
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call `session.proceed()`.
Args:
session (bandsaw.session.Session): The session of the execution.
"""
session.proceed()
advise_task_with_chain(task, execution, configuration, advice_chain='default')
🔗
Executes an Task
with additional advices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
bandsaw.tasks.Task |
The task to be executed. |
required |
execution |
bandsaw.execution.Execution |
The execution definition for the task. |
required |
configuration |
bandsaw.config.Configuration |
The configuration which should be used during advising. |
required |
advice_chain |
str |
The name of the advice chain which contains the additional advices to be applied to the task. Defaults to 'default'. |
'default' |
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
The result of the task execution. |
Source code in bandsaw/advice.py
def advise_task_with_chain(task, execution, configuration, advice_chain='default'):
"""
Executes an `Task` with additional advices.
Args:
task (bandsaw.tasks.Task): The task to be executed.
execution (bandsaw.execution.Execution): The execution definition for the task.
configuration (bandsaw.config.Configuration): The configuration which should
be used during advising.
advice_chain (str): The name of the advice chain which contains the additional
advices to be applied to the task. Defaults to 'default'.
Returns:
bandsaw.result.Result: The result of the task execution.
"""
session = Session(task, execution, configuration, advice_chain)
return session.initiate()
advices
special
🔗
Package that contains reusable Advice
classes
cache
🔗
Contains Advice that can cache task results in a local file system.
CachingAdvice (Advice)
🔗
Advice that caches results in a local filesystem.
Attributes:
Name | Type | Description |
---|---|---|
directory |
Path |
The path to the directory where the results are cached. |
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/cache.py
def after(self, session):
cache_item_path = pathlib.Path(session.context['cache-item-path'])
if not cache_item_path.exists():
cache_item_directory = cache_item_path.parent
if not cache_item_directory.exists():
cache_item_directory.mkdir(parents=True)
logger.info("Storing result in cache '%s'", cache_item_path)
with open(cache_item_path, 'wb') as stream:
session.serializer.serialize(session.result, stream)
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/cache.py
def before(self, session):
artifact_id = session.task.task_id
revision_id = session.execution.execution_id
cache_item_path = self.directory / artifact_id / revision_id
session.context['cache-item-path'] = str(cache_item_path)
if cache_item_path.exists():
logger.info("Using result from cache '%s'", cache_item_path)
with open(cache_item_path, 'rb') as stream:
result = session.serializer.deserialize(stream)
session.conclude(result)
return
session.proceed()
log
🔗
Contains an Advice
implementation which adds logging
LoggingAdvice (Advice)
🔗
An Advice which adds additional logging
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/log.py
def after(self, session):
logger.info(
"AFTER %s:%s with context %s",
session.task.task_id,
session.execution.execution_id,
session.context,
)
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/log.py
def before(self, session):
logger.info(
"BEFORE %s:%s with context %s",
session.task.task_id,
session.execution.execution_id,
session.context,
)
session.proceed()
ssh
🔗
Contains code for running tasks remotely via SSH
Remote
🔗
Definition of a remote machine.
Remotes are used for executing sessions on remote machines.
Attributes:
Name | Type | Description |
---|---|---|
host |
str |
The hostname of the machine, where this interpreter is run. |
port |
int |
The port where ssh is listening for connections. Defaults to 22. |
user |
str |
The name of the user, under which the interpreter will run. Defaults to the name of the local user. |
key_file |
str |
Path to a file containing the key to use for authentication. |
interpreter |
bandsaw.interpreter.Interpreter |
The interpreter which should be used, including its executable and python path. |
directory |
str |
Remote directory where temporary files are stored. If |
SshAdvice (Advice)
🔗
Advice that moves and proceeds a session on a remote machine via SSH
__init__(self, directory=None, backend=<bandsaw.advices.ssh.SshCommandLineBackend object at 0x7fc688f21410>)
special
🔗
Create a new instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str |
The local directory where temporary files are stored to
exchange data between the local and the remote machine. If |
None |
Source code in bandsaw/advices/ssh.py
def __init__(self, directory=None, backend=SshCommandLineBackend()):
"""
Create a new instance.
Args:
directory (str): The local directory where temporary files are stored to
exchange data between the local and the remote machine. If `None` a
temporary directory is used.
"""
if directory is None:
self.directory = pathlib.Path(tempfile.mkdtemp())
else:
self.directory = pathlib.Path(directory)
logger.info("Using directory %s for exchange data", self.directory)
self.remotes = {}
self._backend = backend
super().__init__()
add_remote(self, remote, name='default')
🔗
Add a new definition of a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Definition of the remote machine. |
required |
name |
str |
Name of the remote. Defaults to |
'default' |
Returns:
Type | Description |
---|---|
bandsaw.advices.ssh.SshAdvice |
The advice with the added remote. |
Source code in bandsaw/advices/ssh.py
def add_remote(self, remote, name='default'):
"""
Add a new definition of a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Definition of the remote machine.
name (str): Name of the remote. Defaults to `default`.
Returns:
bandsaw.advices.ssh.SshAdvice: The advice with the added remote.
"""
self.remotes[name] = remote
return self
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/ssh.py
def after(self, session):
logger.info("after called in process %d", os.getpid())
logger.info("Remote process created result %s", session.result)
logger.info("Returning to end remote session and continue in parent")
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/ssh.py
def before(self, session):
session_id = session.execution.execution_id
session_in_path = pathlib.Path(
tempfile.mktemp('.zip', 'in-' + session_id + '-', self.directory)
)
session_out_path = pathlib.Path(
tempfile.mktemp('.zip', 'out-' + session_id + '-', self.directory)
)
logger.info("Writing session to %s", session_in_path)
with io.FileIO(str(session_in_path), mode='w') as stream:
session.save(stream)
remote_name = session.task.kwargs.get('ssh', {}).get('remote', 'default')
remote = self.remotes[remote_name]
remote_run_directory = remote.directory / session.execution.execution_id
logger.info(
"Creating run directory %s on host %s",
remote_run_directory,
remote.host,
)
self._backend.create_dir(
remote,
remote_run_directory,
)
logger.info("Copying over distribution archive to host %s", remote.host)
distribution_archive_path = session.distribution_archive.path
remote_distribution_archive_path = (
remote_run_directory / distribution_archive_path.name
)
self._backend.copy_file_to_remote(
remote,
distribution_archive_path,
remote_distribution_archive_path,
)
logger.info("Copying over session to host %s", remote.host)
remote_session_in_path = remote_run_directory / session_in_path.name
self._backend.copy_file_to_remote(
remote,
session_in_path,
remote_session_in_path,
)
remote_session_out_path = remote_run_directory / session_out_path.name
logger.info(
"Running remote process using interpreter %s", remote.interpreter.executable
)
self._backend.execute_remote(
remote,
remote.interpreter.executable,
str(remote_distribution_archive_path),
'--input',
str(remote_session_in_path),
'--output',
str(remote_session_out_path),
'--run-id',
session.run_id,
)
# environment = self.interpreter.environment
# environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
logger.info("Remote process exited")
logger.info("Copying over session result from host %s", remote.host)
self._backend.copy_file_from_remote(
remote,
remote_session_out_path,
session_out_path,
)
logger.info(
"Cleaning up remote directory %s on host %s",
remote_run_directory,
remote.host,
)
self._backend.delete_dir(remote, remote_run_directory)
logger.info("Restore local session from %s", session_out_path)
with io.FileIO(str(session_out_path), mode='r') as stream:
session.restore(stream)
logger.info("Proceed local session")
session.proceed()
SshBackend (ABC)
🔗
Interface definition for different SSH backends.
copy_file_from_remote(self, remote, remote_path, local_path)
🔗
Copies a remote file or directory to the local file system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine from which a file should be copied. |
required |
remote_path |
pathlib.Path |
Remote path of the file which should be copied. |
required |
local_path |
pathlib.Path |
Local path to the file where it should be copied to. |
required |
Source code in bandsaw/advices/ssh.py
def copy_file_from_remote(self, remote, remote_path, local_path):
"""
Copies a remote file or directory to the local file system.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine from which a file
should be copied.
remote_path (pathlib.Path): Remote path of the file which should be
copied.
local_path (pathlib.Path): Local path to the file where it should be
copied to.
"""
copy_file_to_remote(self, remote, local_path, remote_path)
🔗
Copies a local file or directory to a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine to which a file should be copied. |
required |
local_path |
pathlib.Path |
Local path to the file which should be copied over. |
required |
remote_path |
pathlib.Path |
Remote path of the file where it should be copied to. |
required |
Source code in bandsaw/advices/ssh.py
def copy_file_to_remote(self, remote, local_path, remote_path):
"""
Copies a local file or directory to a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine to which a file should
be copied.
local_path (pathlib.Path): Local path to the file which should be copied
over.
remote_path (pathlib.Path): Remote path of the file where it should be
copied to.
"""
create_dir(self, remote, remote_path)
🔗
Create a directory on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where a directory will be created. |
required |
remote_path |
str |
Remote path to the directory that should be created. |
required |
Source code in bandsaw/advices/ssh.py
def create_dir(self, remote, remote_path):
"""
Create a directory on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
be created.
remote_path (str): Remote path to the directory that should be created.
"""
delete_dir(self, remote, remote_path)
🔗
Delete a directory on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where a directory will be deleted. |
required |
remote_path |
str |
Remote path to the directory that should be deleted. |
required |
Source code in bandsaw/advices/ssh.py
def delete_dir(self, remote, remote_path):
"""
Delete a directory on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
be deleted.
remote_path (str): Remote path to the directory that should be deleted.
"""
execute_remote(self, remote, executable, *arguments)
🔗
Executes an executable on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where |
required |
executable |
str |
Remote path of the executable which should be executed. |
required |
*arguments |
str |
Positional arguments which are the command line
parameter for the |
() |
Exceptions:
Type | Description |
---|---|
subprocess.CalledProcessError |
If the remote process ends with an error. Its return code will be available through the exception. |
Source code in bandsaw/advices/ssh.py
def execute_remote(self, remote, executable, *arguments):
"""
Executes an executable on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where `executable`
will be executed.
executable (str): Remote path of the executable which should be executed.
*arguments (str): Positional arguments which are the command line
parameter for the `executable`.
Raises:
subprocess.CalledProcessError: If the remote process ends with an error.
Its return code will be available through the exception.
"""
SshCommandLineBackend (SshBackend)
🔗
SSH backend that uses the SSH command line tools.
copy_file_from_remote(self, remote, remote_path, local_path)
🔗
Copies a remote file or directory to the local file system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine from which a file should be copied. |
required |
remote_path |
pathlib.Path |
Remote path of the file which should be copied. |
required |
local_path |
pathlib.Path |
Local path to the file where it should be copied to. |
required |
Source code in bandsaw/advices/ssh.py
def copy_file_from_remote(self, remote, remote_path, local_path):
copy_source = self.get_remote_path(remote, remote_path)
self._run(
[
'scp',
'-P',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
copy_source,
str(local_path),
],
)
copy_file_to_remote(self, remote, local_path, remote_path)
🔗
Copies a local file or directory to a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine to which a file should be copied. |
required |
local_path |
pathlib.Path |
Local path to the file which should be copied over. |
required |
remote_path |
pathlib.Path |
Remote path of the file where it should be copied to. |
required |
Source code in bandsaw/advices/ssh.py
def copy_file_to_remote(self, remote, local_path, remote_path):
copy_destination = self.get_remote_path(remote, remote_path)
self._run(
[
'scp',
'-P',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
str(local_path),
copy_destination,
],
)
create_dir(self, remote, remote_path)
🔗
Create a directory on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where a directory will be created. |
required |
remote_path |
str |
Remote path to the directory that should be created. |
required |
Source code in bandsaw/advices/ssh.py
def create_dir(self, remote, remote_path):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
'mkdir',
'-p',
str(remote_path),
]
)
delete_dir(self, remote, remote_path)
🔗
Delete a directory on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where a directory will be deleted. |
required |
remote_path |
str |
Remote path to the directory that should be deleted. |
required |
Source code in bandsaw/advices/ssh.py
def delete_dir(self, remote, remote_path):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
'rm',
'-Rf',
str(remote_path),
]
)
execute_remote(self, remote, executable, *arguments)
🔗
Executes an executable on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where |
required |
executable |
str |
Remote path of the executable which should be executed. |
required |
*arguments |
str |
Positional arguments which are the command line
parameter for the |
() |
Exceptions:
Type | Description |
---|---|
subprocess.CalledProcessError |
If the remote process ends with an error. Its return code will be available through the exception. |
Source code in bandsaw/advices/ssh.py
def execute_remote(self, remote, executable, *arguments):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
str(executable),
]
+ list(arguments),
)
get_remote_path(self, remote, path)
🔗
Returns the remote path in form of
Source code in bandsaw/advices/ssh.py
def get_remote_path(self, remote, path):
"""Returns the remote path in form of <user>@<host>:<path>"""
return f"{self.login(remote)}:{path.absolute()}"
login(remote)
staticmethod
🔗
Returns the destination of the remote in form of
Source code in bandsaw/advices/ssh.py
@staticmethod
def login(remote):
"""Returns the destination of the remote in form of <user>@<host>"""
return f"{remote.user}@{remote.host}"
subprocess
🔗
Contains Advice implementation that runs the execution in a subprocess
SubprocessAdvice (Advice)
🔗
Advice that runs in a subprocess
__init__(self, directory=None, interpreter=None)
special
🔗
Create a new instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str |
The directory where temporary files are stored to
exchange data between both processes. If |
None |
interpreter |
bandsaw.interpreter.Interpreter |
The interpreter to use in
the subprocess. If |
None |
Source code in bandsaw/advices/subprocess.py
def __init__(self, directory=None, interpreter=None):
"""
Create a new instance.
Args:
directory (str): The directory where temporary files are stored to
exchange data between both processes. If `None` a temporary directory
is used.
interpreter (bandsaw.interpreter.Interpreter): The interpreter to use in
the subprocess. If `None` the same interpreter will be used.
"""
if directory is None:
self.directory = pathlib.Path(tempfile.mkdtemp())
else:
self.directory = pathlib.Path(directory)
logger.info("Using directory %s", self.directory)
self.interpreter = interpreter or Interpreter()
super().__init__()
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/subprocess.py
def after(self, session):
logger.info("after called in process %d", os.getpid())
logger.info("Sub process created result %s", session.result)
logger.info("Returning to end session and continue in parent")
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/subprocess.py
def before(self, session):
logger.info("before called in process %d", os.getpid())
session_id = session.execution.execution_id
session_in_file, session_in_path = tempfile.mkstemp(
'.zip', 'in-' + session_id + '-', self.directory
)
session_out_file, session_out_path = tempfile.mkstemp(
'.zip', 'out-' + session_id + '-', self.directory
)
archive_path = session.distribution_archive.path
logger.info("Writing session to %s", session_in_path)
with io.FileIO(session_in_file, mode='w') as stream:
session.save(stream)
logger.info(
"Continue session in subprocess using interpreter %s and "
"distribution archive %s",
self.interpreter.executable,
archive_path,
)
environment = self.interpreter.environment
environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
subprocess.check_call(
[
self.interpreter.executable,
archive_path,
'--input',
session_in_path,
'--output',
session_out_path,
'--run-id',
session.run_id,
],
env=environment,
)
logger.info("Sub process exited")
logger.info("Reading session from %s", session_out_path)
with io.FileIO(session_out_file, mode='r') as stream:
session.restore(stream)
logger.info("proceed() session in parent process")
session.proceed()
config
🔗
Contains the class and functions to configure bandsaw.
Configuration
🔗
Class that represents a configuration for bandsaw.
add_advice_chain(self, *advices, *, name='default')
🔗
Add a new advice chain to the configuration.
Each advice chain has a unique name
. If multiple chains with the same name
are added to the configuration, the last chain overwrites all previous chains.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*advices |
bandsaw.advice.Advice |
A tuple of advices for this chain. |
() |
name |
str |
The name of the advice chain, defaults to 'default' if not specified. |
'default' |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the chain was added. |
Source code in bandsaw/config.py
def add_advice_chain(self, *advices, name='default'):
"""
Add a new advice chain to the configuration.
Each advice chain has a unique `name`. If multiple chains with the same name
are added to the configuration, the last chain overwrites all previous chains.
Args:
*advices (bandsaw.advice.Advice): A tuple of advices for this chain.
name (str): The name of the advice chain, defaults to 'default' if not
specified.
Returns:
bandsaw.config.Configuration: The configuration to which the chain was
added.
"""
self._advice_chains[name] = advices
return self
add_extension(self, extension)
🔗
Add an Extension
to the configuration.
Extensions
are objects that can implement callbacks to be informed by
bandsaw about certain conditions, e.g. the creation of new tasks or the final
result of an execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
extension |
bandsaw.extension.Extension |
An object implementing the
|
required |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the extension was added. |
Source code in bandsaw/config.py
def add_extension(self, extension):
"""
Add an `Extension` to the configuration.
`Extensions` are objects that can implement callbacks to be informed by
bandsaw about certain conditions, e.g. the creation of new tasks or the final
result of an execution.
Args:
extension (bandsaw.extension.Extension): An object implementing the
`Extension`.
Returns:
bandsaw.config.Configuration: The configuration to which the extension was
added.
"""
self.extensions.append(extension)
return self
add_modules_for_distribution(self, *modules)
🔗
Add modules that should be included in the distribution archive.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*modules |
List[str] |
Positional arguments with strings, that contain the names of modules, which should be included in the distribution archive. |
() |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration with the added modules. |
Source code in bandsaw/config.py
def add_modules_for_distribution(self, *modules):
"""
Add modules that should be included in the distribution archive.
Args:
*modules (List[str]): Positional arguments with strings, that contain the
names of modules, which should be included in the distribution
archive.
Returns:
bandsaw.config.Configuration: The configuration with the added modules.
"""
self.distribution_modules.extend(modules)
return self
get_advice_chain(self, name)
🔗
Returns the advice chain with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the wanted advice chain. |
required |
Returns:
Type | Description |
---|---|
List[bandsaw.advice.Advice] |
The advice chain with the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If no chain with the specified name is configured. |
Source code in bandsaw/config.py
def get_advice_chain(self, name):
"""
Returns the advice chain with the given name.
Args:
name (str): Name of the wanted advice chain.
Returns:
List[bandsaw.advice.Advice]: The advice chain with the given name.
Raises:
KeyError: If no chain with the specified name is configured.
"""
return self._advice_chains.get(name)
set_serializer(self, serializer)
🔗
Sets the serialize which defines how tasks and results will be serialized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
serializer |
bandsaw.serialization.Serializer |
The serializer to use for serializing objects. |
required |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the extension was added. |
Source code in bandsaw/config.py
def set_serializer(self, serializer):
"""
Sets the serialize which defines how tasks and results will be serialized.
Args:
serializer (bandsaw.serialization.Serializer): The serializer to use for
serializing objects.
Returns:
bandsaw.config.Configuration: The configuration to which the extension was
added.
"""
self.serializer = serializer
return self
get_configuration(configuration_module=None)
🔗
Return a configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration_module |
str |
The module name of a module, which contains the
configuration. The module needs to define a member 'configuration', which
contains an instance of |
None |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration. |
Exceptions:
Type | Description |
---|---|
ModuleNotFoundError |
If no module exists with name |
LookupError |
If the module doesn't contain a variable 'configuration`. |
TypeError |
If the variable |
Source code in bandsaw/config.py
def get_configuration(configuration_module=None):
"""
Return a configuration.
Args:
configuration_module (str): The module name of a module, which contains the
configuration. The module needs to define a member 'configuration', which
contains an instance of `Configuration`. If no module name is given, a
default configuration is returned based on the value of the
`BANDSAW_CONFIG_MODULE` environment variable. If this variable is not set,
we default to 'bandsaw_config'.
Returns:
bandsaw.config.Configuration: The configuration.
Raises:
ModuleNotFoundError: If no module exists with name `configuration_module`.
LookupError: If the module doesn't contain a variable 'configuration`.
TypeError: If the variable `configuration` is not of type `Configuration`.
"""
if configuration_module is None:
default_configuration_module_name = os.getenv(
CONFIGURATION_MODULE_ENV_VARIABLE,
CONFIGURATION_MODULE_DEFAULT,
)
configuration_module = default_configuration_module_name
if configuration_module not in _configurations:
try:
_load_configuration_module(configuration_module)
except ModuleNotFoundError:
logger.warning(
"No module found for config %s",
configuration_module,
)
raise
return _configurations[configuration_module]
context
🔗
Classes that represent the context used in advising tasks.
Context (SerializableValue)
🔗
Class for representing the context for advising tasks.
The context contains of a set of arbitrary key-value mappings that can be used
by the Advice
classes to store state or communicate with other advices.
attributes
property
readonly
🔗
A set of arbitrary key-value mappings for the Advice
classes.
Advice
can add to this mapping and use this as a way of keeping state.
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/context.py
@classmethod
def deserialize(cls, values):
return Context(values['attributes'])
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/context.py
def serialized(self):
data = {
'attributes': self._attributes,
}
return data
decorator
🔗
Contains decorators that allow to define individual tasks
task(*task_args, *, config=None, chain=None, **task_kwargs)
🔗
Decorator that is used to define a function as as task.
The decorator can be used in two different ways, standalone:
Examples:
>>> @task
... def my_task_function():
... pass
or with additional configuration.
Examples:
>>> @task(config='my.config')
... def my_task_function():
... pass
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
str |
The name of the configuration module to use for this task. If not given, the default configuration is used. |
None |
chain |
str |
The name of the advice chain to use for advising this task. If not given, 'default' is used. |
None |
*task_args |
Positional args given to the decorator OR the decorated function.
If the decorator is used WITHOUT providing additional configuration,
|
() |
|
**task_kwargs |
Keyword args given to the decorator.
If the decorator is used WITHOUT providing additional configuration,
|
{} |
Returns:
Type | Description |
---|---|
Callable |
Returns a callable that wraps the decorated function. |
Exceptions:
Type | Description |
---|---|
ModuleNotFoundError |
If the configured configuration module does not exist. |
ValueError |
If the specified advice chain does not exist. |
RuntimeError |
If the task is configured with multiple positional arguments. |
Source code in bandsaw/decorator.py
def task(*task_args, config=None, chain=None, **task_kwargs):
"""
Decorator that is used to define a function as as task.
The decorator can be used in two different ways, standalone:
Example:
>>> @task
... def my_task_function():
... pass
or with additional configuration.
Example:
>>> @task(config='my.config')
... def my_task_function():
... pass
Args:
config (str): The name of the configuration module to use for this task.
If not given, the default configuration is used.
chain (str): The name of the advice chain to use for advising this task.
If not given, 'default' is used.
*task_args: Positional args given to the decorator OR the decorated function.
If the decorator is used WITHOUT providing additional configuration,
`task_args` contains a tuple with a single item that is the function to be
used as a task. If there is additional configuration given, `task_args`
contains the positional arguments of the call of the decorator.
**task_kwargs: Keyword args given to the decorator.
If the decorator is used WITHOUT providing additional configuration,
`task_kwargs` is an empty dictionary. If there is additional configuration
given, `task_kwargs`contains the keyword arguments of the call of the
decorator.
Returns:
Callable: Returns a callable that wraps the decorated function.
Raises:
ModuleNotFoundError: If the configured configuration module does not exist.
ValueError: If the specified advice chain does not exist.
RuntimeError: If the task is configured with multiple positional arguments.
"""
config_module = config
configuration = get_configuration(config_module)
chain_name = chain or 'default'
chain = configuration.get_advice_chain(chain_name)
if chain is None:
raise ValueError(f"Unknown advice chain {chain_name}")
def decorate_function(func):
logger.info("Decorate function '%s'", func)
logger.info("Creating task for function '%s'", func)
the_task = Task.create_task(func, task_kwargs)
def inner(*func_args, **func_kwargs):
execution_id = _calculate_execution_id(
func_args,
func_kwargs,
configuration.serializer,
)
execution = Execution(execution_id, func_args, func_kwargs)
result = advise_task_with_chain(
the_task,
execution,
configuration,
chain_name,
)
if result.exception:
raise result.exception
return result.value
inner.__wrapped__ = func
inner.bandsaw_task = the_task
inner.bandsaw_configuration = configuration
return inner
if len(task_args) == 1 and len(task_kwargs) == 0:
return decorate_function(task_args[0])
if len(task_args) == 0 and len(task_kwargs) > 0:
return decorate_function
# This shouldn't happen if the decorator is properly used.
raise RuntimeError("Invalid 'task' decorator.")
distribution
🔗
Contains functions for creating distribution archives.
Distribution archives are the way how bandsaw transfers code between different machines. They are normal zip files, that contain bandsaw itself, a main module which allows to execute the archive and to continue sessions and possibly some additional dependencies.
DistributionArchive
🔗
Class that represents a distribution archive.
A distribution archive contains all the code necessary for running a task. It can be used for running a task on a different machine by copying over the archive.
Attributes:
Name | Type | Description |
---|---|---|
path |
pathlib.Path |
The path to the file containing the code. |
path
property
readonly
🔗
Returns:
Type | Description |
---|---|
pathlib.Path |
The path to the archive file. The file itself is created lazily, when the path is accessed the first time. This makes sure, we only create the archive if necessary. |
get_distribution_archive(configuration)
🔗
Returns a distribution archive for a given configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration |
bandsaw.config.Configuration |
The configuration for which the distribution package should be returned. |
required |
Returns:
Type | Description |
---|---|
bandsaw.distribution.DistributionArchive |
The archive for the configuration. |
Source code in bandsaw/distribution.py
def get_distribution_archive(configuration):
"""
Returns a distribution archive for a given configuration.
Args:
configuration (bandsaw.config.Configuration): The configuration for which the
distribution package should be returned.
Returns:
bandsaw.distribution.DistributionArchive: The archive for the configuration.
"""
archive = _CACHE.get_archive(configuration)
if archive is None:
archive_path = pathlib.Path(
tempfile.mktemp(suffix='.pyz', prefix='distribution-')
)
modules = [
'__main__',
'bandsaw',
configuration.module_name,
*configuration.distribution_modules,
]
archive = DistributionArchive(archive_path, *modules)
_CACHE.put_archive(configuration, archive)
return archive
execution
🔗
Contains classes and functions around an execution of a task
Execution (SerializableValue)
🔗
Class that defines an execution of a Task
.
It contains the arguments that should be used for the task and an unique identifier derived from those arguments.
Attributes:
Name | Type | Description |
---|---|---|
execution_id |
str |
A string identifying this execution. |
args |
tuple[Any] |
The positional arguments for the task to use in this execution. |
kwargs |
Dict[Any,Any] |
The keyword arguments for the task to use in this execution. |
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/execution.py
@classmethod
def deserialize(cls, values):
return Execution(
values['execution_id'],
values['args'],
values['kwargs'],
)
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/execution.py
def serialized(self):
return {
'execution_id': self.execution_id,
'args': self.args,
'kwargs': self.kwargs,
}
extensions
🔗
Contains an API for extensions that can be used in bandsaw
Extension
🔗
Class that defines the interface of extensions.
An extension can define different callbacks that are called by bandsaw and allows
to extend some existing functionality (e.g. by setting additional values in a
context before it is handled by all advices) or integrate other systems.
Other than Advice
, an Extension
is globally defined in a config and therefore
applies to all tasks.
on_after_advice(self, task, execution, context, result)
🔗
Called after bandsaw advises a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
bandsaw.tasks.Task |
The task which was advised. |
required |
execution |
bandsaw.execution.Execution |
The execution which contains the parametrization of the task. |
required |
context |
bandsaw.context.Context |
The context which was used during the advice. |
required |
result |
bandsaw.result.Result |
The result of the call. |
required |
Source code in bandsaw/extensions.py
def on_after_advice(self, task, execution, context, result):
"""
Called after bandsaw advises a task.
Args:
task (bandsaw.tasks.Task): The task which was advised.
execution (bandsaw.execution.Execution): The execution which contains the
parametrization of the task.
context (bandsaw.context.Context): The context which was used during the
advice.
result (bandsaw.result.Result): The result of the call.
"""
on_before_advice(self, task, execution, context)
🔗
Called before bandsaw advises a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
bandsaw.tasks.Task |
The task which will be advised. |
required |
execution |
bandsaw.execution.Execution |
The execution which contains the parametrization of the task. |
required |
context |
bandsaw.context.Context |
The context which will be used during the advice. The context can be extended by the extension. |
required |
Source code in bandsaw/extensions.py
def on_before_advice(self, task, execution, context):
"""
Called before bandsaw advises a task.
Args:
task (bandsaw.tasks.Task): The task which will be advised.
execution (bandsaw.execution.Execution): The execution which contains the
parametrization of the task.
context (bandsaw.context.Context): The context which will be used during
the advice. The context can be extended by the extension.
"""
on_init(self, configuration)
🔗
Called when a bandsaw configuration has been initialized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration |
bandsaw.config.Configuration |
The configuration object which contains the config that has been loaded. |
required |
Source code in bandsaw/extensions.py
def on_init(self, configuration):
"""
Called when a bandsaw configuration has been initialized.
Args:
configuration (bandsaw.config.Configuration): The configuration object
which contains the config that has been loaded.
"""
identifier
🔗
Functions for generating identifier for arbitrary python objects.
identifier_from_bytes(buffer)
🔗
Derive an identifier from a bytebuffer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
buffer |
Union[bytes,bytearray] |
The binary data from which to derive an identifier. |
required |
Returns:
Type | Description |
---|---|
str |
The identifier in form of a string of a hexadecimal number. |
Source code in bandsaw/identifier.py
def identifier_from_bytes(buffer):
"""
Derive an identifier from a bytebuffer.
Args:
buffer (Union[bytes,bytearray]): The binary data from which to derive an
identifier.
Returns:
str: The identifier in form of a string of a hexadecimal number.
"""
identifier = hashlib.sha256(buffer).hexdigest()[:_ID_LENGTH]
return identifier
identifier_from_string(string)
🔗
Derive an identifier from a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string |
str |
The string from which to derive an identifier. |
required |
Returns:
Type | Description |
---|---|
str |
The identifier in form of a string of a hexadecimal number. |
Source code in bandsaw/identifier.py
def identifier_from_string(string):
"""
Derive an identifier from a string.
Args:
string (str): The string from which to derive an identifier.
Returns:
str: The identifier in form of a string of a hexadecimal number.
"""
identifier = identifier_from_bytes(string.encode('utf-8'))
return identifier
interpreter
🔗
Contains classes regarding python interpreters
Interpreter (SerializableValue)
🔗
Class for representing different python interpreters.
This class is used to contain the information about specific python interpreters that are used within the library. In order to support multiple different interpreters there will be the option to define the interpreter as part of config. Currently only a single interpreter is automatically defined.
environment
property
readonly
🔗
The environment variables to be set for the interpreter.
path
property
readonly
🔗
The python path items that will be used.
__init__(self, path=None, executable=None)
special
🔗
Create a new interpreter instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
List[str] |
A list of directory paths, to be used as $PYTHONPATH.
If |
None |
executable |
str |
The path to the python executable for this interpreter.
If |
None |
Source code in bandsaw/interpreter.py
def __init__(
self,
path=None,
executable=None,
):
"""
Create a new interpreter instance.
Args:
path (List[str]): A list of directory paths, to be used as $PYTHONPATH.
If `None` the current `sys.path` is used.
executable (str): The path to the python executable for this interpreter.
If `None` the current `sys.executable` is used.
"""
if path is None:
self._path = tuple(sys.path)
else:
self._path = tuple(path)
self.executable = executable or sys.executable
self._environment = {}
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/interpreter.py
@classmethod
def deserialize(cls, values):
return Interpreter(
path=values['path'],
executable=values['executable'],
).set_environment(**values['environment'])
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/interpreter.py
def serialized(self):
return {
'path': self._path,
'executable': self.executable,
'environment': self._environment,
}
set_environment(self, **environment)
🔗
Set the environment variables to use for this interpreter.
A call to this methods overwrites all variables that have been set previously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**environment |
Arbitrary keyword arguments where the name of the keyword corresponds to the name of the environment variable and the values will be the values set in the environment. |
{} |
Source code in bandsaw/interpreter.py
def set_environment(self, **environment):
"""
Set the environment variables to use for this interpreter.
A call to this methods overwrites all variables that have been set previously.
Args:
**environment: Arbitrary keyword arguments where the name of the keyword
corresponds to the name of the environment variable and the values
will be the values set in the environment.
"""
self._environment = environment
return self
io
🔗
Contains classes related to input/output of data.
BytearrayGeneratorToStream (RawIOBase)
🔗
Stream that reads from a generator that yields bytes/bytearrays.
read_stream_to_generator(stream, buffer_size=8192)
🔗
Read from a stream into a generator yielding bytes
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The stream to read bytes from. |
required |
buffer_size |
int |
The buffer size to read. Each individual |
8192 |
Yields
bytes: A byte buffer of maximum size of buffer_size
.
Source code in bandsaw/io.py
def read_stream_to_generator(stream, buffer_size=8192):
"""
Read from a stream into a generator yielding `bytes`.
Args:
stream (io.Stream): The stream to read bytes from.
buffer_size (int): The buffer size to read. Each individual `bytes` buffer
returned by the generator has a maximum size of `buffer_size`. Defaults
to `8192` if not set.
Yields:
bytes: A byte buffer of maximum size of `buffer_size`.
"""
buffer = bytearray(buffer_size)
while True:
read = stream.readinto(buffer)
if read:
yield buffer[:read]
else:
break
modules
🔗
Utility functions for handling python modules.
get_loaded_module_name_by_path(file_path)
🔗
Determine the name of an already loaded module by its file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
File path of the python file containing the module. |
required |
Returns:
Type | Description |
---|---|
str |
The name of the module, or |
Source code in bandsaw/modules.py
def get_loaded_module_name_by_path(file_path):
"""
Determine the name of an already loaded module by its file path.
Args:
file_path (str): File path of the python file containing the module.
Returns:
str: The name of the module, or `None` if the file isn't loaded as a module.
"""
real_path = os.path.realpath(file_path)
for name, module in sys.modules.items():
if hasattr(module, '__file__'):
module_path = os.path.realpath(module.__file__)
if module_path == real_path:
return name
return None
import_object(object_name, module_name)
🔗
Import a python object from a module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object_name |
str |
The name under which the object is defined in the module. |
required |
module_name |
str |
The name of the module from which the object should be imported. |
required |
Returns:
Type | Description |
---|---|
object |
The python object defined under the name. |
Exceptions:
Type | Description |
---|---|
AttributeError |
If nothing is defined with name |
ModuleNotFoundError |
If no module exists with the name |
Source code in bandsaw/modules.py
def import_object(object_name, module_name):
"""
Import a python object from a module.
Args:
object_name (str): The name under which the object is defined in the module.
module_name (str): The name of the module from which the object should be
imported.
Returns:
object: The python object defined under the name.
Raises:
AttributeError: If nothing is defined with name `object_name` in the
referenced module.
ModuleNotFoundError: If no module exists with the name `module_name`.
"""
module = importlib.import_module(module_name)
return getattr(module, object_name)
object_as_import(obj)
🔗
Returns the name and module of an object, which can be used for importing it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
object |
An arbitrary Python object. |
required |
Returns:
Type | Description |
---|---|
Tuple(str, str) |
Returns a tuple of the object name and the module name in which the object is defined. |
Exceptions:
Type | Description |
---|---|
ValueError |
If |
Note
If obj
is defined within the __main__
script, the function tries to
determine a name for the __main__
module, under which it could be imported
from other scripts.
Source code in bandsaw/modules.py
def object_as_import(obj):
"""
Returns the name and module of an object, which can be used for importing it.
Args:
obj (object): An arbitrary Python object.
Returns:
Tuple(str, str): Returns a tuple of the object name and the module name in
which the object is defined.
Raises:
ValueError: If `obj` doesn't have a name that can be directly imported, e.g.
because it is defined within a local class.
Note:
If `obj` is defined within the `__main__` script, the function tries to
determine a name for the `__main__` module, under which it could be imported
from other scripts.
"""
object_name = obj.__name__
module_name = obj.__module__
if module_name == '__main__':
module_file_path = sys.modules['__main__'].__file__
module_name = _guess_module_name_by_path(module_file_path, sys.path)
if '<locals>' in obj.__qualname__:
raise ValueError("Can't import local functions.")
return object_name, module_name
result
🔗
Contains code for representing the result of tasks.
Result (SerializableValue)
🔗
Class to encapsulate the result of a task execution.
Attributes:
Name | Type | Description |
---|---|---|
value |
Any |
The value that is returned by the task. |
exception |
Exception |
The exception that was raised during execution, |
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/result.py
@classmethod
def deserialize(cls, values):
value = values["value"]
exception = values["exception"]
return Result(value=value, exception=exception)
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/result.py
def serialized(self):
values = {
"value": self.value,
"exception": self.exception,
}
return values
run
🔗
Functions for managing the run id.
get_run_id()
🔗
Returns the run id.
The run id is a unique identifier that is specific to an individual run of a workflow. It stays the same across all task executions and can be used for tracking metrics and differentiating between different runs of the same workflow where task_id and run_id stay the same.
Returns:
Type | Description |
---|---|
str |
The unique run id. |
Source code in bandsaw/run.py
def get_run_id():
"""
Returns the run id.
The run id is a unique identifier that is specific to an individual run of a
workflow. It stays the same across all task executions and can be used for
tracking metrics and differentiating between different runs of the same workflow
where task_id and run_id stay the same.
Returns:
str: The unique run id.
"""
if _RUN_ID is None:
set_run_id(str(uuid.uuid1()))
return _RUN_ID
set_run_id(run_id)
🔗
Sets the run id.
Setting the run id explicitly is usually not necessary. The function is mainly used when task executions are run in a different process to make sure the run id is consistent with the spawning process, but it can be used e.g. if an external system provides a unique identifier for a specific workflow run.
When set_run_id(run_id)
is being used, it must be run before the first tasks
are actually defined.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the run id was already set before. |
Source code in bandsaw/run.py
def set_run_id(run_id):
"""
Sets the run id.
Setting the run id explicitly is usually not necessary. The function is mainly
used when task executions are run in a different process to make sure the run id
is consistent with the spawning process, but it can be used e.g. if an external
system provides a unique identifier for a specific workflow run.
When `set_run_id(run_id)` is being used, it must be run before the first tasks
are actually defined.
Raises:
RuntimeError: If the run id was already set before.
"""
global _RUN_ID # pylint: disable=global-statement
if _RUN_ID is not None:
logger.error("run_id already set to %s when trying to set again", _RUN_ID)
raise RuntimeError("Run ID was already set")
logger.info("Set run_id to %s", run_id)
_RUN_ID = run_id
runner
🔗
Contains main() function to continue sessions from files
main(args)
🔗
Main function that can be used for proceeding a session.
This function allows to read a session from a file, proceed it until it returns and then save the state of the session to a new file. It is used for running tasks in a separate process or on different machines.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
tuple[str] |
The arguments taken from the command line. |
required |
Source code in bandsaw/runner.py
def main(args):
"""
Main function that can be used for proceeding a session.
This function allows to read a session from a file, proceed it until it returns
and then save the state of the session to a new file. It is used for running
tasks in a separate process or on different machines.
Args:
args (tuple[str]): The arguments taken from the command line.
"""
hostname = os.uname()[1]
log_format = (
f"{{asctime}} {hostname} {{process: >5d}} {{thread: >5d}} "
f"{{name}} {{levelname}}: {{message}}"
)
logging.basicConfig(level=logging.INFO, format=log_format, style='{')
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input_session',
help="The session which should be continued",
required=True,
)
parser.add_argument(
'--output',
dest='output_session',
help="The session after continuation ended",
required=True,
)
parser.add_argument(
'--run-id',
dest='run_id',
help="The run id of the workflow",
required=True,
)
args = parser.parse_args(args=args)
set_run_id(args.run_id)
logger.info("Creating new session")
session = Session()
logger.info("Reading session from %s", args.output_session)
with io.FileIO(args.input_session, mode='r') as stream:
session.restore(stream)
logger.info("Proceeding session")
session.proceed()
logger.info("Writing session with result to %s", args.output_session)
with io.FileIO(args.output_session, mode='w') as stream:
session.save(stream)
serialization
special
🔗
Module for the package bandsaw.serialization. Contains all public types.
json
🔗
Contains a Serializer that allows to serialize objects to JSON.
JsonSerializer (Serializer)
🔗
A Serializer
which serializes objects to JSON.
Attributes:
Name | Type | Description |
---|---|---|
value_serializers |
List[ValueSerializer] |
A list of serializers that are used for serialization of custom types. |
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/json.py
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
result = json.load(
text_stream,
cls=_ExtensibleJSONDecoder,
value_serializers=self.value_serializers,
)
text_stream.detach()
return result
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/json.py
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
json.dump(
value,
text_stream,
cls=_ExtensibleJSONEncoder,
value_serializers=self.value_serializers,
allow_nan=False,
indent=None,
separators=(',', ':'),
sort_keys=True,
)
text_stream.detach()
pickle
🔗
Contains a Serializer which uses pickle for serializing values.
PickleSerializer (Serializer)
🔗
A Serializer
which serializes objects using pickle.
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/pickle.py
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
return pickle.load(stream)
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/pickle.py
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
pickle.dump(value, stream)
serializer
🔗
Base classes for serializers which allow to serialize python values.
Serializer (ABC)
🔗
Interface for Serializer
which serialize objects
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
values
🔗
A collection of classes for serializing custom objects.
ExceptionSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing exceptions.
The serializer saves only the type and the args
attribute of the exception,
therefore it won't work for all exception types, but it should cover the most.
Other attributes of the exception, e.g. stacktrace etc. are discarded.
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, Exception)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
module_name = representation['module']
type_name = representation['type']
module = importlib.import_module(module_name)
value_type = getattr(module, type_name)
return value_type(*representation['args'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
state = {
'type': type(value).__name__,
'module': type(value).__module__,
'args': value.args,
}
return state
SerializableValue (ABC)
🔗
Interface for types that can serialize themselves.
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/serialization/values.py
@classmethod
@abc.abstractmethod
def deserialize(cls, values):
"""Returns a new instance of a value from its serialized representation."""
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialized(self):
"""Returns a serializable representation of the value."""
SerializableValueSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing subclasses of SerializableValue
.
The serializer uses the methods defined in SerializableValue
and implemented
by the individual classes to serialize values. It stores the type of the value
and its serialized representation and allows to recreate the value from this
information.
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, SerializableValue)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
module_name = representation['module']
type_name = representation['type']
module = importlib.import_module(module_name)
value_type = getattr(module, type_name)
return value_type.deserialize(representation['serialized'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
state = {
'type': type(value).__name__,
'module': type(value).__module__,
'serialized': value.serialized(),
}
return state
TupleSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing tuples.
The serializer supports normal tuples as well as named tuples. When namedtuples are deserialized it first tries to reuse an existing namedtople type. If the type can't be imported or reused, a new namedtuple type with the same name and fields is created on the fly.
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, tuple)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
if representation['type'] == 'namedtuple':
# try to import the namedtuple type
module_name = representation['module']
type_name = representation['name']
try:
module = importlib.import_module(module_name)
tuple_type = getattr(module, type_name)
except (ImportError, AttributeError) as error:
logger.warning(
"Error importing namedtuple, trying to recreate it: %s", error
)
# Recreate a new type
field_names = ' '.join(representation['fields'])
tuple_type = collections.namedtuple(
type_name, field_names, module=module_name
)
return tuple_type(*representation['items'])
return tuple(representation['items'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
if hasattr(value, '_fields'):
state = {
'type': 'namedtuple',
'fields': list(value._fields),
'name': type(value).__name__,
'module': type(value).__module__,
'items': list(value),
}
else:
state = {
'type': 'tuple',
'items': list(value),
}
return state
ValueSerializer (ABC)
🔗
Interface for serializers that can serialize custom values.
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def can_serialize_value(self, value):
"""
Returns if a serializer can serialize a specific value.
Args:
value (Any): The value that should be serialized.
Returns:
boolean: `True` if this serializer can serialize the given value,
otherwise `False`.
"""
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def deserialize_value(self, representation):
"""
Returns a deserialized value from its serialized representation.
Args:
representation (Any): The serialized representation of the value.
Returns:
Any: The deserialized value.
"""
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialize_value(self, value):
"""
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive
values, lists or dicts.
Args:
value (Any): The value that should be serialized.
Returns:
Any: The serialized representation of the value.
"""
session
🔗
Contains classes for representing an advising session
Session
🔗
Class that handles the advising of an execution.
A Session
object is given to the individual advices that are called to advise
the execution. By calling the appropriate methods like def proceed(self)
to
continue or conclude()
to end with a result, the advices can influence the final
result.
Additionally, the session provides access to the context
, which allows advices
to keep state, the execution
that is advised, the configuration
that is used
for advising and the result
of the execution.
Attributes:
Name | Type | Description |
---|---|---|
task |
bandsaw.tasks.Task |
The task that is executed. |
execution |
bandsaw.execution.Execution |
The execution arguments for the task. |
context |
bandsaw.context.Context |
The context that can be used for advices to store state. |
result |
bandsaw.result.Result |
Result of the task if already computed.
Otherwise |
_configuration |
bandsaw.config.Configuration |
The configuration that is being used for advising this task. |
distribution_archive
property
readonly
🔗
The DistributionArchive which can be used when transferring the session.
run_id
property
readonly
🔗
The run id of the workflow.
serializer
property
readonly
🔗
The serializer that can be used for serializing values.
__init__(self, task=None, execution=None, configuration=None, advice_chain='default')
special
🔗
Create a new session.
Source code in bandsaw/session.py
def __init__(
self,
task=None,
execution=None,
configuration=None,
advice_chain='default',
):
"""
Create a new session.
"""
self.task = task
self.execution = execution
self.context = {}
self.result = None
self._configuration = configuration
self._advice_chain = advice_chain
self._moderator = None
conclude(self, result)
🔗
Conclude the process of advising with a Result
.
This can be used in two cases:
-
Concluding BEFORE the task was actually executed. This will skip all subsequent advices defined later in the advice chain and will skip the task execution. The given
result
will then be used as preliminary result. All advices that are defined before the calling advice in the advice chain will still be called with thereafter(session)
method. -
Concluding AFTER the task was actually executed. This will just change the
result
of the session and continue will all following advices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
bandsaw.result.Result |
The result to conclude with. |
required |
Source code in bandsaw/session.py
def conclude(self, result):
"""
Conclude the process of advising with a `Result`.
This can be used in two cases:
1. Concluding BEFORE the task was actually executed. This will skip all
subsequent advices defined later in the advice chain and will skip the
task execution. The given `result` will then be used as preliminary result.
All advices that are defined before the calling advice in the advice chain
will still be called with there `after(session)` method.
2. Concluding AFTER the task was actually executed. This will just change the
`result` of the session and continue will all following advices.
Args:
result (bandsaw.result.Result): The result to conclude with.
"""
self.result = result
self._moderator.skip(self)
initiate(self)
🔗
Start the process of advising an execution.
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
The final result of the execution after all advices. |
Source code in bandsaw/session.py
def initiate(self):
"""
Start the process of advising an execution.
Returns:
bandsaw.result.Result: The final result of the execution after all
advices.
"""
self._moderator = _Moderator(
self._configuration.get_advice_chain(self._advice_chain)
)
logger.debug("running extensions before advice")
for extension in self._configuration.extensions:
extension.on_before_advice(self.task, self.execution, self.context)
self.proceed()
if not self._moderator.is_finished:
raise RuntimeError(
f"Not all advice has been applied. "
f"Misbehaving advice {self._moderator.current_advice}"
)
logger.debug("running extensions after advice")
for extension in self._configuration.extensions:
extension.on_after_advice(
self.task, self.execution, self.context, self.result
)
return self.result
proceed(self)
🔗
Continue the process of advising with the next advice.
Source code in bandsaw/session.py
def proceed(self):
"""
Continue the process of advising with the next advice.
"""
self._moderator.next(self)
restore(self, stream)
🔗
Resume a prior suspended session.
Source code in bandsaw/session.py
def restore(self, stream):
"""
Resume a prior suspended session.
"""
self._load_from_zip(stream)
return self
save(self, stream)
🔗
Suspend the session to be resumed later or elsewhere.
Source code in bandsaw/session.py
def save(self, stream):
"""
Suspend the session to be resumed later or elsewhere.
"""
self._store_as_zip(stream)
tasks
🔗
Contains classes and functions representing different types of tasks
Task (SerializableValue, ABC)
🔗
Base-class for different types of Tasks
that can be executed
Attributes:
Name | Type | Description |
---|---|---|
task_id |
str |
A unique identifier for the individual tasks. |
kwargs |
dict |
A dictionary with additional arguments provided at task definition. |
source |
str |
The python source code as string which defines the task. |
bytecode |
bytes |
The compiled byte code of the task definition. |
bytecode
property
readonly
🔗
The compiled byte code of the task definition as bytes
.
kwargs
property
readonly
🔗
Additional arguments defined at task definition.
source
property
readonly
🔗
The python source code as str
which defines the task.
create_task(obj, task_kwargs=None)
classmethod
🔗
Factory for creating a task for different Python objects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
Python object that should be run as a task. |
required |
task_kwargs |
dict |
A dictionary containing additional task arguments. |
None |
Returns:
Type | Description |
---|---|
bandsaw.tasks.Task |
Instance of |
Exceptions:
Type | Description |
---|---|
TypeError |
If there is no support for this type of python object. |
Source code in bandsaw/tasks.py
@classmethod
def create_task(cls, obj, task_kwargs=None):
"""
Factory for creating a task for different Python objects.
Args:
obj (Any): Python object that should be run as a task.
task_kwargs (dict): A dictionary containing additional task arguments.
Returns:
bandsaw.tasks.Task: Instance of `Task` class that allows to execute the
task.
Raises:
TypeError: If there is no support for this type of python object.
"""
if task_kwargs is None:
task_kwargs = {}
if isinstance(obj, types.FunctionType):
if '.<locals>.' in obj.__qualname__:
return _FunctionWithClosureTask(obj, task_kwargs)
function_name, module_name = object_as_import(obj)
return _FunctionTask(function_name, module_name, task_kwargs)
raise TypeError(f"Unsupported task object of type {type(obj)}")
execute(self, execution)
🔗
Execute the task with the arguments specified by the execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution |
bandsaw.execution.Execution |
The definition which contains how the task should be executed. |
required |
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
A |
Source code in bandsaw/tasks.py
def execute(self, execution):
"""
Execute the task with the arguments specified by the execution.
Args:
execution (bandsaw.execution.Execution): The definition which contains how
the task should be executed.
Returns:
bandsaw.result.Result: A `Result` object with either the returned value
from the task or an exception that was raised by the task.
"""
try:
result_value = self._execute(execution.args, execution.kwargs)
result = Result(value=result_value)
except Exception as error: # pylint: disable=W0703 # too general exception
result = Result(exception=error)
return result
user
🔗
Contains functions related to users
get_current_username()
🔗
Returns the name of the user which is currently running the python process.
Returns:
Type | Description |
---|---|
str |
The name of the user on the local system. |
Source code in bandsaw/user.py
def get_current_username():
"""
Returns the name of the user which is currently running the python process.
Returns:
str: The name of the user on the local system.
"""
return pwd.getpwuid(os.getuid())[0]