Skip to content

API

A library for splitting python workflows into separate tasks

advice 🔗

Contains classes and functions for advising tasks.

Advice 🔗

Interface that needs to be implemented by an advice.

The interface is quite simple. One has to implement two different methods, before(session) and after(session), that are called during the process of advising a task execution. Both take a single argument session which contains an instance of the class Session. This object allows the individual advices to influence the task execution by changing the way the task is being called or making changes to the result.

after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advice.py
def after(self, session):  # pylint: disable=R0201 # no-self-use
    """
    Called after the task is actually executed.

    This methods allows the individual advice, to make changes to the result of
    the task execution. The result can be retrieved from the `session`.

    In order to continue, the advice MUST either call `session.proceed()`, which
    will continue the process with current `result` and the next advice in the
    advice chain, or call `session.conclude(result)` with a `Result` instance,
    which will set a different result and continue with it.

    The default implementation will just call `session.proceed()`.

    Args:
        session (bandsaw.session.Session): The session of the execution.
    """
    session.proceed()

before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advice.py
def before(self, session):  # pylint: disable=R0201 # no-self-use
    """
    Called before the task is actually executed.

    This methods allows the individual advice, to make changes to the way the
    task execution is later executed. In order to continue, the advice MUST either
    call `session.proceed()`, which will continue the process with the next
    advice in the advice chain, or call `session.conclude(result)` with a `Result`
    instance, which will skip the following advices and return without executing
    the task execution at all.

    The default implementation will just call `session.proceed()`.

    Args:
        session (bandsaw.session.Session): The session of the execution.
    """
    session.proceed()

advise_task_with_chain(task, execution, configuration, advice_chain='default') 🔗

Executes an Task with additional advices.

Parameters:

Name Type Description Default
task bandsaw.tasks.Task

The task to be executed.

required
execution bandsaw.execution.Execution

The execution definition for the task.

required
configuration bandsaw.config.Configuration

The configuration which should be used during advising.

required
advice_chain str

The name of the advice chain which contains the additional advices to be applied to the task. Defaults to 'default'.

'default'

Returns:

Type Description
bandsaw.result.Result

The result of the task execution.

Source code in bandsaw/advice.py
def advise_task_with_chain(task, execution, configuration, advice_chain='default'):
    """
    Executes an `Task` with additional advices.

    Args:
        task (bandsaw.tasks.Task): The task to be executed.
        execution (bandsaw.execution.Execution): The execution definition for the task.
        configuration (bandsaw.config.Configuration): The configuration which should
            be used during advising.
        advice_chain (str): The name of the advice chain which contains the additional
            advices to be applied to the task. Defaults to 'default'.

    Returns:
        bandsaw.result.Result: The result of the task execution.

    """
    session = Session(task, execution, configuration, advice_chain)
    return session.initiate()

advices special 🔗

Package that contains reusable Advice classes

cache 🔗

Contains Advice that can cache task results in a local file system.

CachingAdvice (Advice) 🔗

Advice that caches results in a local filesystem.

Attributes:

Name Type Description
directory Path

The path to the directory where the results are cached.

after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/cache.py
def after(self, session):
    cache_item_path = pathlib.Path(session.context['cache-item-path'])
    if not cache_item_path.exists():
        cache_item_directory = cache_item_path.parent
        if not cache_item_directory.exists():
            cache_item_directory.mkdir(parents=True)

        logger.info("Storing result in cache '%s'", cache_item_path)

        with open(cache_item_path, 'wb') as stream:
            session.serializer.serialize(session.result, stream)
    session.proceed()
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/cache.py
def before(self, session):
    artifact_id = session.task.task_id
    revision_id = session.execution.execution_id

    cache_item_path = self.directory / artifact_id / revision_id
    session.context['cache-item-path'] = str(cache_item_path)
    if cache_item_path.exists():
        logger.info("Using result from cache '%s'", cache_item_path)

        with open(cache_item_path, 'rb') as stream:
            result = session.serializer.deserialize(stream)
        session.conclude(result)
        return
    session.proceed()

log 🔗

Contains an Advice implementation which adds logging

LoggingAdvice (Advice) 🔗

An Advice which adds additional logging

after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/log.py
def after(self, session):
    logger.info(
        "AFTER %s:%s with context %s",
        session.task.task_id,
        session.execution.execution_id,
        session.context,
    )
    session.proceed()
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/log.py
def before(self, session):
    logger.info(
        "BEFORE %s:%s with context %s",
        session.task.task_id,
        session.execution.execution_id,
        session.context,
    )
    session.proceed()

ssh 🔗

Contains code for running tasks remotely via SSH

Remote 🔗

Definition of a remote machine.

Remotes are used for executing sessions on remote machines.

Attributes:

Name Type Description
host str

The hostname of the machine, where this interpreter is run.

port int

The port where ssh is listening for connections. Defaults to 22.

user str

The name of the user, under which the interpreter will run. Defaults to the name of the local user.

key_file str

Path to a file containing the key to use for authentication.

interpreter bandsaw.interpreter.Interpreter

The interpreter which should be used, including its executable and python path.

directory str

Remote directory where temporary files are stored. If None, defaults to '/tmp'.

SshAdvice (Advice) 🔗

Advice that moves and proceeds a session on a remote machine via SSH

__init__(self, directory=None, backend=<bandsaw.advices.ssh.SshCommandLineBackend object at 0x7fc688f21410>) special 🔗

Create a new instance.

Parameters:

Name Type Description Default
directory str

The local directory where temporary files are stored to exchange data between the local and the remote machine. If None a temporary directory is used.

None
Source code in bandsaw/advices/ssh.py
def __init__(self, directory=None, backend=SshCommandLineBackend()):
    """
    Create a new instance.

    Args:
        directory (str): The local directory where temporary files are stored to
            exchange data between the local and the remote machine. If `None` a
            temporary directory is used.
    """
    if directory is None:
        self.directory = pathlib.Path(tempfile.mkdtemp())
    else:
        self.directory = pathlib.Path(directory)
    logger.info("Using directory %s for exchange data", self.directory)

    self.remotes = {}
    self._backend = backend
    super().__init__()
add_remote(self, remote, name='default') 🔗

Add a new definition of a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Definition of the remote machine.

required
name str

Name of the remote. Defaults to default.

'default'

Returns:

Type Description
bandsaw.advices.ssh.SshAdvice

The advice with the added remote.

Source code in bandsaw/advices/ssh.py
def add_remote(self, remote, name='default'):
    """
    Add a new definition of a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Definition of the remote machine.
        name (str): Name of the remote. Defaults to `default`.

    Returns:
        bandsaw.advices.ssh.SshAdvice: The advice with the added remote.
    """
    self.remotes[name] = remote
    return self
after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/ssh.py
def after(self, session):
    logger.info("after called in process %d", os.getpid())
    logger.info("Remote process created result %s", session.result)
    logger.info("Returning to end remote session and continue in parent")
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/ssh.py
def before(self, session):

    session_id = session.execution.execution_id

    session_in_path = pathlib.Path(
        tempfile.mktemp('.zip', 'in-' + session_id + '-', self.directory)
    )
    session_out_path = pathlib.Path(
        tempfile.mktemp('.zip', 'out-' + session_id + '-', self.directory)
    )

    logger.info("Writing session to %s", session_in_path)
    with io.FileIO(str(session_in_path), mode='w') as stream:
        session.save(stream)

    remote_name = session.task.kwargs.get('ssh', {}).get('remote', 'default')
    remote = self.remotes[remote_name]

    remote_run_directory = remote.directory / session.execution.execution_id
    logger.info(
        "Creating run directory %s on host %s",
        remote_run_directory,
        remote.host,
    )
    self._backend.create_dir(
        remote,
        remote_run_directory,
    )

    logger.info("Copying over distribution archive to host %s", remote.host)
    distribution_archive_path = session.distribution_archive.path
    remote_distribution_archive_path = (
        remote_run_directory / distribution_archive_path.name
    )
    self._backend.copy_file_to_remote(
        remote,
        distribution_archive_path,
        remote_distribution_archive_path,
    )

    logger.info("Copying over session to host %s", remote.host)
    remote_session_in_path = remote_run_directory / session_in_path.name
    self._backend.copy_file_to_remote(
        remote,
        session_in_path,
        remote_session_in_path,
    )

    remote_session_out_path = remote_run_directory / session_out_path.name

    logger.info(
        "Running remote process using interpreter %s", remote.interpreter.executable
    )
    self._backend.execute_remote(
        remote,
        remote.interpreter.executable,
        str(remote_distribution_archive_path),
        '--input',
        str(remote_session_in_path),
        '--output',
        str(remote_session_out_path),
        '--run-id',
        session.run_id,
    )
    # environment = self.interpreter.environment
    # environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
    logger.info("Remote process exited")

    logger.info("Copying over session result from host %s", remote.host)
    self._backend.copy_file_from_remote(
        remote,
        remote_session_out_path,
        session_out_path,
    )

    logger.info(
        "Cleaning up remote directory %s on host %s",
        remote_run_directory,
        remote.host,
    )
    self._backend.delete_dir(remote, remote_run_directory)

    logger.info("Restore local session from %s", session_out_path)
    with io.FileIO(str(session_out_path), mode='r') as stream:
        session.restore(stream)

    logger.info("Proceed local session")
    session.proceed()

SshBackend (ABC) 🔗

Interface definition for different SSH backends.

copy_file_from_remote(self, remote, remote_path, local_path) 🔗

Copies a remote file or directory to the local file system.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine from which a file should be copied.

required
remote_path pathlib.Path

Remote path of the file which should be copied.

required
local_path pathlib.Path

Local path to the file where it should be copied to.

required
Source code in bandsaw/advices/ssh.py
def copy_file_from_remote(self, remote, remote_path, local_path):
    """
    Copies a remote file or directory to the local file system.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine from which a file
            should be copied.
        remote_path (pathlib.Path): Remote path of the file which should be
            copied.
        local_path (pathlib.Path): Local path to the file where it should be
            copied to.
    """
copy_file_to_remote(self, remote, local_path, remote_path) 🔗

Copies a local file or directory to a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine to which a file should be copied.

required
local_path pathlib.Path

Local path to the file which should be copied over.

required
remote_path pathlib.Path

Remote path of the file where it should be copied to.

required
Source code in bandsaw/advices/ssh.py
def copy_file_to_remote(self, remote, local_path, remote_path):
    """
    Copies a local file or directory to a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine to which a file should
            be copied.
        local_path (pathlib.Path): Local path to the file which should be copied
            over.
        remote_path (pathlib.Path): Remote path of the file where it should be
            copied to.
    """
create_dir(self, remote, remote_path) 🔗

Create a directory on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where a directory will be created.

required
remote_path str

Remote path to the directory that should be created.

required
Source code in bandsaw/advices/ssh.py
def create_dir(self, remote, remote_path):
    """
    Create a directory on a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
            be created.
        remote_path (str): Remote path to the directory that should be created.
    """
delete_dir(self, remote, remote_path) 🔗

Delete a directory on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where a directory will be deleted.

required
remote_path str

Remote path to the directory that should be deleted.

required
Source code in bandsaw/advices/ssh.py
def delete_dir(self, remote, remote_path):
    """
    Delete a directory on a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
            be deleted.
        remote_path (str): Remote path to the directory that should be deleted.
    """
execute_remote(self, remote, executable, *arguments) 🔗

Executes an executable on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where executable will be executed.

required
executable str

Remote path of the executable which should be executed.

required
*arguments str

Positional arguments which are the command line parameter for the executable.

()

Exceptions:

Type Description
subprocess.CalledProcessError

If the remote process ends with an error. Its return code will be available through the exception.

Source code in bandsaw/advices/ssh.py
def execute_remote(self, remote, executable, *arguments):
    """
    Executes an executable on a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine where `executable`
            will be executed.
        executable (str): Remote path of the executable which should be executed.
        *arguments (str): Positional arguments which are the command line
            parameter for the `executable`.

    Raises:
        subprocess.CalledProcessError: If the remote process ends with an error.
            Its return code will be available through the exception.
    """

SshCommandLineBackend (SshBackend) 🔗

SSH backend that uses the SSH command line tools.

copy_file_from_remote(self, remote, remote_path, local_path) 🔗

Copies a remote file or directory to the local file system.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine from which a file should be copied.

required
remote_path pathlib.Path

Remote path of the file which should be copied.

required
local_path pathlib.Path

Local path to the file where it should be copied to.

required
Source code in bandsaw/advices/ssh.py
def copy_file_from_remote(self, remote, remote_path, local_path):
    copy_source = self.get_remote_path(remote, remote_path)
    self._run(
        [
            'scp',
            '-P',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            copy_source,
            str(local_path),
        ],
    )
copy_file_to_remote(self, remote, local_path, remote_path) 🔗

Copies a local file or directory to a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine to which a file should be copied.

required
local_path pathlib.Path

Local path to the file which should be copied over.

required
remote_path pathlib.Path

Remote path of the file where it should be copied to.

required
Source code in bandsaw/advices/ssh.py
def copy_file_to_remote(self, remote, local_path, remote_path):
    copy_destination = self.get_remote_path(remote, remote_path)
    self._run(
        [
            'scp',
            '-P',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            str(local_path),
            copy_destination,
        ],
    )
create_dir(self, remote, remote_path) 🔗

Create a directory on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where a directory will be created.

required
remote_path str

Remote path to the directory that should be created.

required
Source code in bandsaw/advices/ssh.py
def create_dir(self, remote, remote_path):
    return self._run(
        [
            'ssh',
            '-p',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            self.login(remote),
            'mkdir',
            '-p',
            str(remote_path),
        ]
    )
delete_dir(self, remote, remote_path) 🔗

Delete a directory on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where a directory will be deleted.

required
remote_path str

Remote path to the directory that should be deleted.

required
Source code in bandsaw/advices/ssh.py
def delete_dir(self, remote, remote_path):
    return self._run(
        [
            'ssh',
            '-p',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            self.login(remote),
            'rm',
            '-Rf',
            str(remote_path),
        ]
    )
execute_remote(self, remote, executable, *arguments) 🔗

Executes an executable on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where executable will be executed.

required
executable str

Remote path of the executable which should be executed.

required
*arguments str

Positional arguments which are the command line parameter for the executable.

()

Exceptions:

Type Description
subprocess.CalledProcessError

If the remote process ends with an error. Its return code will be available through the exception.

Source code in bandsaw/advices/ssh.py
def execute_remote(self, remote, executable, *arguments):
    return self._run(
        [
            'ssh',
            '-p',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            self.login(remote),
            str(executable),
        ]
        + list(arguments),
    )
get_remote_path(self, remote, path) 🔗

Returns the remote path in form of @:

Source code in bandsaw/advices/ssh.py
def get_remote_path(self, remote, path):
    """Returns the remote path in form of <user>@<host>:<path>"""
    return f"{self.login(remote)}:{path.absolute()}"
login(remote) staticmethod 🔗

Returns the destination of the remote in form of @

Source code in bandsaw/advices/ssh.py
@staticmethod
def login(remote):
    """Returns the destination of the remote in form of <user>@<host>"""
    return f"{remote.user}@{remote.host}"

subprocess 🔗

Contains Advice implementation that runs the execution in a subprocess

SubprocessAdvice (Advice) 🔗

Advice that runs in a subprocess

__init__(self, directory=None, interpreter=None) special 🔗

Create a new instance.

Parameters:

Name Type Description Default
directory str

The directory where temporary files are stored to exchange data between both processes. If None a temporary directory is used.

None
interpreter bandsaw.interpreter.Interpreter

The interpreter to use in the subprocess. If None the same interpreter will be used.

None
Source code in bandsaw/advices/subprocess.py
def __init__(self, directory=None, interpreter=None):
    """
    Create a new instance.

    Args:
        directory (str): The directory where temporary files are stored to
            exchange data between both processes. If `None` a temporary directory
            is used.
        interpreter (bandsaw.interpreter.Interpreter): The interpreter to use in
            the subprocess. If `None` the same interpreter will be used.
    """
    if directory is None:
        self.directory = pathlib.Path(tempfile.mkdtemp())
    else:
        self.directory = pathlib.Path(directory)
    logger.info("Using directory %s", self.directory)
    self.interpreter = interpreter or Interpreter()
    super().__init__()
after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/subprocess.py
def after(self, session):
    logger.info("after called in process %d", os.getpid())
    logger.info("Sub process created result %s", session.result)
    logger.info("Returning to end session and continue in parent")
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/subprocess.py
def before(self, session):
    logger.info("before called in process %d", os.getpid())

    session_id = session.execution.execution_id

    session_in_file, session_in_path = tempfile.mkstemp(
        '.zip', 'in-' + session_id + '-', self.directory
    )
    session_out_file, session_out_path = tempfile.mkstemp(
        '.zip', 'out-' + session_id + '-', self.directory
    )
    archive_path = session.distribution_archive.path

    logger.info("Writing session to %s", session_in_path)
    with io.FileIO(session_in_file, mode='w') as stream:
        session.save(stream)

    logger.info(
        "Continue session in subprocess using interpreter %s and "
        "distribution archive %s",
        self.interpreter.executable,
        archive_path,
    )
    environment = self.interpreter.environment
    environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
    subprocess.check_call(
        [
            self.interpreter.executable,
            archive_path,
            '--input',
            session_in_path,
            '--output',
            session_out_path,
            '--run-id',
            session.run_id,
        ],
        env=environment,
    )
    logger.info("Sub process exited")

    logger.info("Reading session from %s", session_out_path)
    with io.FileIO(session_out_file, mode='r') as stream:
        session.restore(stream)

    logger.info("proceed() session in parent process")
    session.proceed()

config 🔗

Contains the class and functions to configure bandsaw.

Configuration 🔗

Class that represents a configuration for bandsaw.

add_advice_chain(self, *advices, *, name='default') 🔗

Add a new advice chain to the configuration.

Each advice chain has a unique name. If multiple chains with the same name are added to the configuration, the last chain overwrites all previous chains.

Parameters:

Name Type Description Default
*advices bandsaw.advice.Advice

A tuple of advices for this chain.

()
name str

The name of the advice chain, defaults to 'default' if not specified.

'default'

Returns:

Type Description
bandsaw.config.Configuration

The configuration to which the chain was added.

Source code in bandsaw/config.py
def add_advice_chain(self, *advices, name='default'):
    """
    Add a new advice chain to the configuration.

    Each advice chain has a unique `name`. If multiple chains with the same name
    are added to the configuration, the last chain overwrites all previous chains.

    Args:
        *advices (bandsaw.advice.Advice): A tuple of advices for this chain.
        name (str): The name of the advice chain, defaults to 'default' if not
            specified.

    Returns:
        bandsaw.config.Configuration: The configuration to which the chain was
            added.

    """
    self._advice_chains[name] = advices
    return self

add_extension(self, extension) 🔗

Add an Extension to the configuration.

Extensions are objects that can implement callbacks to be informed by bandsaw about certain conditions, e.g. the creation of new tasks or the final result of an execution.

Parameters:

Name Type Description Default
extension bandsaw.extension.Extension

An object implementing the Extension.

required

Returns:

Type Description
bandsaw.config.Configuration

The configuration to which the extension was added.

Source code in bandsaw/config.py
def add_extension(self, extension):
    """
    Add an `Extension` to the configuration.

    `Extensions` are objects that can implement callbacks to be informed by
    bandsaw about certain conditions, e.g. the creation of new tasks or the final
    result of an execution.

    Args:
        extension (bandsaw.extension.Extension): An object implementing the
            `Extension`.

    Returns:
        bandsaw.config.Configuration: The configuration to which the extension was
            added.
    """
    self.extensions.append(extension)
    return self

add_modules_for_distribution(self, *modules) 🔗

Add modules that should be included in the distribution archive.

Parameters:

Name Type Description Default
*modules List[str]

Positional arguments with strings, that contain the names of modules, which should be included in the distribution archive.

()

Returns:

Type Description
bandsaw.config.Configuration

The configuration with the added modules.

Source code in bandsaw/config.py
def add_modules_for_distribution(self, *modules):
    """
    Add modules that should be included in the distribution archive.

    Args:
        *modules (List[str]): Positional arguments with strings, that contain the
            names of modules, which should be included in the distribution
            archive.

    Returns:
        bandsaw.config.Configuration: The configuration with the added modules.
    """
    self.distribution_modules.extend(modules)
    return self

get_advice_chain(self, name) 🔗

Returns the advice chain with the given name.

Parameters:

Name Type Description Default
name str

Name of the wanted advice chain.

required

Returns:

Type Description
List[bandsaw.advice.Advice]

The advice chain with the given name.

Exceptions:

Type Description
KeyError

If no chain with the specified name is configured.

Source code in bandsaw/config.py
def get_advice_chain(self, name):
    """
    Returns the advice chain with the given name.

    Args:
        name (str): Name of the wanted advice chain.

    Returns:
        List[bandsaw.advice.Advice]: The advice chain with the given name.

    Raises:
        KeyError: If no chain with the specified name is configured.
    """
    return self._advice_chains.get(name)

set_serializer(self, serializer) 🔗

Sets the serialize which defines how tasks and results will be serialized.

Parameters:

Name Type Description Default
serializer bandsaw.serialization.Serializer

The serializer to use for serializing objects.

required

Returns:

Type Description
bandsaw.config.Configuration

The configuration to which the extension was added.

Source code in bandsaw/config.py
def set_serializer(self, serializer):
    """
    Sets the serialize which defines how tasks and results will be serialized.

    Args:
        serializer (bandsaw.serialization.Serializer): The serializer to use for
            serializing objects.

    Returns:
        bandsaw.config.Configuration: The configuration to which the extension was
            added.
    """
    self.serializer = serializer
    return self

get_configuration(configuration_module=None) 🔗

Return a configuration.

Parameters:

Name Type Description Default
configuration_module str

The module name of a module, which contains the configuration. The module needs to define a member 'configuration', which contains an instance of Configuration. If no module name is given, a default configuration is returned based on the value of the BANDSAW_CONFIG_MODULE environment variable. If this variable is not set, we default to 'bandsaw_config'.

None

Returns:

Type Description
bandsaw.config.Configuration

The configuration.

Exceptions:

Type Description
ModuleNotFoundError

If no module exists with name configuration_module.

LookupError

If the module doesn't contain a variable 'configuration`.

TypeError

If the variable configuration is not of type Configuration.

Source code in bandsaw/config.py
def get_configuration(configuration_module=None):
    """
    Return a configuration.

    Args:
        configuration_module (str): The module name of a module, which contains the
            configuration. The module needs to define a member 'configuration', which
            contains an instance of `Configuration`. If no module name is given, a
            default configuration is returned based on the value of the
            `BANDSAW_CONFIG_MODULE` environment variable. If this variable is not set,
            we default to 'bandsaw_config'.

    Returns:
        bandsaw.config.Configuration: The configuration.

    Raises:
        ModuleNotFoundError: If no module exists with name `configuration_module`.
        LookupError: If the module doesn't contain a variable 'configuration`.
        TypeError: If the variable `configuration` is not of type `Configuration`.
    """
    if configuration_module is None:
        default_configuration_module_name = os.getenv(
            CONFIGURATION_MODULE_ENV_VARIABLE,
            CONFIGURATION_MODULE_DEFAULT,
        )
        configuration_module = default_configuration_module_name

    if configuration_module not in _configurations:
        try:
            _load_configuration_module(configuration_module)
        except ModuleNotFoundError:
            logger.warning(
                "No module found for config %s",
                configuration_module,
            )
            raise
    return _configurations[configuration_module]

context 🔗

Classes that represent the context used in advising tasks.

Context (SerializableValue) 🔗

Class for representing the context for advising tasks.

The context contains of a set of arbitrary key-value mappings that can be used by the Advice classes to store state or communicate with other advices.

attributes property readonly 🔗

A set of arbitrary key-value mappings for the Advice classes.

Advice can add to this mapping and use this as a way of keeping state.

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/context.py
@classmethod
def deserialize(cls, values):
    return Context(values['attributes'])

serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/context.py
def serialized(self):
    data = {
        'attributes': self._attributes,
    }
    return data

decorator 🔗

Contains decorators that allow to define individual tasks

task(*task_args, *, config=None, chain=None, **task_kwargs) 🔗

Decorator that is used to define a function as as task.

The decorator can be used in two different ways, standalone:

Examples:

>>> @task
... def my_task_function():
...      pass

or with additional configuration.

Examples:

>>> @task(config='my.config')
... def my_task_function():
...      pass

Parameters:

Name Type Description Default
config str

The name of the configuration module to use for this task. If not given, the default configuration is used.

None
chain str

The name of the advice chain to use for advising this task. If not given, 'default' is used.

None
*task_args

Positional args given to the decorator OR the decorated function. If the decorator is used WITHOUT providing additional configuration, task_args contains a tuple with a single item that is the function to be used as a task. If there is additional configuration given, task_args contains the positional arguments of the call of the decorator.

()
**task_kwargs

Keyword args given to the decorator. If the decorator is used WITHOUT providing additional configuration, task_kwargs is an empty dictionary. If there is additional configuration given, task_kwargscontains the keyword arguments of the call of the decorator.

{}

Returns:

Type Description
Callable

Returns a callable that wraps the decorated function.

Exceptions:

Type Description
ModuleNotFoundError

If the configured configuration module does not exist.

ValueError

If the specified advice chain does not exist.

RuntimeError

If the task is configured with multiple positional arguments.

Source code in bandsaw/decorator.py
def task(*task_args, config=None, chain=None, **task_kwargs):
    """
    Decorator that is used to define a function as as task.

    The decorator can be used in two different ways, standalone:

    Example:
        >>> @task
        ... def my_task_function():
        ...      pass

    or with additional configuration.

    Example:
        >>> @task(config='my.config')
        ... def my_task_function():
        ...      pass


    Args:
        config (str): The name of the configuration module to use for this task.
            If not given, the default configuration is used.
        chain (str): The name of the advice chain to use for advising this task.
            If not given, 'default' is used.
        *task_args: Positional args given to the decorator OR the decorated function.
            If the decorator is used WITHOUT providing additional configuration,
            `task_args` contains a tuple with a single item that is the function to be
            used as a task. If there is additional configuration given, `task_args`
            contains the positional arguments of the call of the decorator.
        **task_kwargs: Keyword args given to the decorator.
            If the decorator is used WITHOUT providing additional configuration,
            `task_kwargs` is an empty dictionary. If there is additional configuration
            given, `task_kwargs`contains the keyword arguments of the call of the
            decorator.

    Returns:
        Callable: Returns a callable that wraps the decorated function.

    Raises:
        ModuleNotFoundError: If the configured configuration module does not exist.
        ValueError: If the specified advice chain does not exist.
        RuntimeError: If the task is configured with multiple positional arguments.
    """

    config_module = config
    configuration = get_configuration(config_module)

    chain_name = chain or 'default'
    chain = configuration.get_advice_chain(chain_name)
    if chain is None:
        raise ValueError(f"Unknown advice chain {chain_name}")

    def decorate_function(func):
        logger.info("Decorate function '%s'", func)

        logger.info("Creating task for function '%s'", func)
        the_task = Task.create_task(func, task_kwargs)

        def inner(*func_args, **func_kwargs):

            execution_id = _calculate_execution_id(
                func_args,
                func_kwargs,
                configuration.serializer,
            )
            execution = Execution(execution_id, func_args, func_kwargs)

            result = advise_task_with_chain(
                the_task,
                execution,
                configuration,
                chain_name,
            )
            if result.exception:
                raise result.exception
            return result.value

        inner.__wrapped__ = func
        inner.bandsaw_task = the_task
        inner.bandsaw_configuration = configuration
        return inner

    if len(task_args) == 1 and len(task_kwargs) == 0:
        return decorate_function(task_args[0])
    if len(task_args) == 0 and len(task_kwargs) > 0:
        return decorate_function
    # This shouldn't happen if the decorator is properly used.
    raise RuntimeError("Invalid 'task' decorator.")

distribution 🔗

Contains functions for creating distribution archives.

Distribution archives are the way how bandsaw transfers code between different machines. They are normal zip files, that contain bandsaw itself, a main module which allows to execute the archive and to continue sessions and possibly some additional dependencies.

DistributionArchive 🔗

Class that represents a distribution archive.

A distribution archive contains all the code necessary for running a task. It can be used for running a task on a different machine by copying over the archive.

Attributes:

Name Type Description
path pathlib.Path

The path to the file containing the code.

path property readonly 🔗

Returns:

Type Description
pathlib.Path

The path to the archive file. The file itself is created lazily, when the path is accessed the first time. This makes sure, we only create the archive if necessary.

get_distribution_archive(configuration) 🔗

Returns a distribution archive for a given configuration.

Parameters:

Name Type Description Default
configuration bandsaw.config.Configuration

The configuration for which the distribution package should be returned.

required

Returns:

Type Description
bandsaw.distribution.DistributionArchive

The archive for the configuration.

Source code in bandsaw/distribution.py
def get_distribution_archive(configuration):
    """
    Returns a distribution archive for a given configuration.

    Args:
        configuration (bandsaw.config.Configuration): The configuration for which the
            distribution package should be returned.

    Returns:
        bandsaw.distribution.DistributionArchive: The archive for the configuration.
    """
    archive = _CACHE.get_archive(configuration)
    if archive is None:
        archive_path = pathlib.Path(
            tempfile.mktemp(suffix='.pyz', prefix='distribution-')
        )
        modules = [
            '__main__',
            'bandsaw',
            configuration.module_name,
            *configuration.distribution_modules,
        ]
        archive = DistributionArchive(archive_path, *modules)
        _CACHE.put_archive(configuration, archive)
    return archive

execution 🔗

Contains classes and functions around an execution of a task

Execution (SerializableValue) 🔗

Class that defines an execution of a Task.

It contains the arguments that should be used for the task and an unique identifier derived from those arguments.

Attributes:

Name Type Description
execution_id str

A string identifying this execution.

args tuple[Any]

The positional arguments for the task to use in this execution.

kwargs Dict[Any,Any]

The keyword arguments for the task to use in this execution.

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/execution.py
@classmethod
def deserialize(cls, values):
    return Execution(
        values['execution_id'],
        values['args'],
        values['kwargs'],
    )

serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/execution.py
def serialized(self):
    return {
        'execution_id': self.execution_id,
        'args': self.args,
        'kwargs': self.kwargs,
    }

extensions 🔗

Contains an API for extensions that can be used in bandsaw

Extension 🔗

Class that defines the interface of extensions.

An extension can define different callbacks that are called by bandsaw and allows to extend some existing functionality (e.g. by setting additional values in a context before it is handled by all advices) or integrate other systems. Other than Advice, an Extension is globally defined in a config and therefore applies to all tasks.

on_after_advice(self, task, execution, context, result) 🔗

Called after bandsaw advises a task.

Parameters:

Name Type Description Default
task bandsaw.tasks.Task

The task which was advised.

required
execution bandsaw.execution.Execution

The execution which contains the parametrization of the task.

required
context bandsaw.context.Context

The context which was used during the advice.

required
result bandsaw.result.Result

The result of the call.

required
Source code in bandsaw/extensions.py
def on_after_advice(self, task, execution, context, result):
    """
    Called after bandsaw advises a task.

    Args:
        task (bandsaw.tasks.Task): The task which was advised.
        execution (bandsaw.execution.Execution): The execution which contains the
            parametrization of the task.
        context (bandsaw.context.Context): The context which was used during the
            advice.
        result (bandsaw.result.Result): The result of the call.
    """

on_before_advice(self, task, execution, context) 🔗

Called before bandsaw advises a task.

Parameters:

Name Type Description Default
task bandsaw.tasks.Task

The task which will be advised.

required
execution bandsaw.execution.Execution

The execution which contains the parametrization of the task.

required
context bandsaw.context.Context

The context which will be used during the advice. The context can be extended by the extension.

required
Source code in bandsaw/extensions.py
def on_before_advice(self, task, execution, context):
    """
    Called before bandsaw advises a task.

    Args:
        task (bandsaw.tasks.Task): The task which will be advised.
        execution (bandsaw.execution.Execution): The execution which contains the
            parametrization of the task.
        context (bandsaw.context.Context): The context which will be used during
            the advice. The context can be extended by the extension.
    """

on_init(self, configuration) 🔗

Called when a bandsaw configuration has been initialized.

Parameters:

Name Type Description Default
configuration bandsaw.config.Configuration

The configuration object which contains the config that has been loaded.

required
Source code in bandsaw/extensions.py
def on_init(self, configuration):
    """
    Called when a bandsaw configuration has been initialized.

    Args:
        configuration (bandsaw.config.Configuration): The configuration object
            which contains the config that has been loaded.
    """

identifier 🔗

Functions for generating identifier for arbitrary python objects.

identifier_from_bytes(buffer) 🔗

Derive an identifier from a bytebuffer.

Parameters:

Name Type Description Default
buffer Union[bytes,bytearray]

The binary data from which to derive an identifier.

required

Returns:

Type Description
str

The identifier in form of a string of a hexadecimal number.

Source code in bandsaw/identifier.py
def identifier_from_bytes(buffer):
    """
    Derive an identifier from a bytebuffer.

    Args:
        buffer (Union[bytes,bytearray]): The binary data from which to derive an
            identifier.

    Returns:
        str: The identifier in form of a string of a hexadecimal number.
    """
    identifier = hashlib.sha256(buffer).hexdigest()[:_ID_LENGTH]
    return identifier

identifier_from_string(string) 🔗

Derive an identifier from a string.

Parameters:

Name Type Description Default
string str

The string from which to derive an identifier.

required

Returns:

Type Description
str

The identifier in form of a string of a hexadecimal number.

Source code in bandsaw/identifier.py
def identifier_from_string(string):
    """
    Derive an identifier from a string.

    Args:
        string (str): The string from which to derive an identifier.

    Returns:
        str: The identifier in form of a string of a hexadecimal number.
    """
    identifier = identifier_from_bytes(string.encode('utf-8'))
    return identifier

interpreter 🔗

Contains classes regarding python interpreters

Interpreter (SerializableValue) 🔗

Class for representing different python interpreters.

This class is used to contain the information about specific python interpreters that are used within the library. In order to support multiple different interpreters there will be the option to define the interpreter as part of config. Currently only a single interpreter is automatically defined.

environment property readonly 🔗

The environment variables to be set for the interpreter.

path property readonly 🔗

The python path items that will be used.

__init__(self, path=None, executable=None) special 🔗

Create a new interpreter instance.

Parameters:

Name Type Description Default
path List[str]

A list of directory paths, to be used as $PYTHONPATH. If None the current sys.path is used.

None
executable str

The path to the python executable for this interpreter. If None the current sys.executable is used.

None
Source code in bandsaw/interpreter.py
def __init__(
    self,
    path=None,
    executable=None,
):
    """
    Create a new interpreter instance.

    Args:
        path (List[str]): A list of directory paths, to be used as $PYTHONPATH.
            If `None` the current `sys.path` is used.
        executable (str): The path to the python executable for this interpreter.
            If `None` the current `sys.executable` is used.
    """
    if path is None:
        self._path = tuple(sys.path)
    else:
        self._path = tuple(path)
    self.executable = executable or sys.executable
    self._environment = {}

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/interpreter.py
@classmethod
def deserialize(cls, values):
    return Interpreter(
        path=values['path'],
        executable=values['executable'],
    ).set_environment(**values['environment'])

serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/interpreter.py
def serialized(self):
    return {
        'path': self._path,
        'executable': self.executable,
        'environment': self._environment,
    }

set_environment(self, **environment) 🔗

Set the environment variables to use for this interpreter.

A call to this methods overwrites all variables that have been set previously.

Parameters:

Name Type Description Default
**environment

Arbitrary keyword arguments where the name of the keyword corresponds to the name of the environment variable and the values will be the values set in the environment.

{}
Source code in bandsaw/interpreter.py
def set_environment(self, **environment):
    """
    Set the environment variables to use for this interpreter.

    A call to this methods overwrites all variables that have been set previously.

    Args:
        **environment: Arbitrary keyword arguments where the name of the keyword
            corresponds to the name of the environment variable and the values
            will be the values set in the environment.
    """
    self._environment = environment
    return self

io 🔗

Contains classes related to input/output of data.

BytearrayGeneratorToStream (RawIOBase) 🔗

Stream that reads from a generator that yields bytes/bytearrays.

read_stream_to_generator(stream, buffer_size=8192) 🔗

Read from a stream into a generator yielding bytes.

Parameters:

Name Type Description Default
stream io.Stream

The stream to read bytes from.

required
buffer_size int

The buffer size to read. Each individual bytes buffer returned by the generator has a maximum size of buffer_size. Defaults to 8192 if not set.

8192

Yields

bytes: A byte buffer of maximum size of buffer_size.

Source code in bandsaw/io.py
def read_stream_to_generator(stream, buffer_size=8192):
    """
    Read from a stream into a generator yielding `bytes`.

    Args:
        stream (io.Stream): The stream to read bytes from.
        buffer_size (int): The buffer size to read. Each individual `bytes` buffer
            returned by the generator has a maximum size of `buffer_size`. Defaults
            to `8192` if not set.

    Yields:
        bytes: A byte buffer of maximum size of `buffer_size`.

    """
    buffer = bytearray(buffer_size)
    while True:
        read = stream.readinto(buffer)
        if read:
            yield buffer[:read]
        else:
            break

modules 🔗

Utility functions for handling python modules.

get_loaded_module_name_by_path(file_path) 🔗

Determine the name of an already loaded module by its file path.

Parameters:

Name Type Description Default
file_path str

File path of the python file containing the module.

required

Returns:

Type Description
str

The name of the module, or None if the file isn't loaded as a module.

Source code in bandsaw/modules.py
def get_loaded_module_name_by_path(file_path):
    """
    Determine the name of an already loaded module by its file path.

    Args:
        file_path (str): File path of the python file containing the module.

    Returns:
        str: The name of the module, or `None` if the file isn't loaded as a module.

    """
    real_path = os.path.realpath(file_path)
    for name, module in sys.modules.items():
        if hasattr(module, '__file__'):
            module_path = os.path.realpath(module.__file__)
            if module_path == real_path:
                return name
    return None

import_object(object_name, module_name) 🔗

Import a python object from a module.

Parameters:

Name Type Description Default
object_name str

The name under which the object is defined in the module.

required
module_name str

The name of the module from which the object should be imported.

required

Returns:

Type Description
object

The python object defined under the name.

Exceptions:

Type Description
AttributeError

If nothing is defined with name object_name in the referenced module.

ModuleNotFoundError

If no module exists with the name module_name.

Source code in bandsaw/modules.py
def import_object(object_name, module_name):
    """
    Import a python object from a module.

    Args:
        object_name (str): The name under which the object is defined in the module.
        module_name (str): The name of the module from which the object should be
            imported.

    Returns:
        object: The python object defined under the name.

    Raises:
        AttributeError: If nothing is defined with name `object_name` in the
            referenced module.
        ModuleNotFoundError: If no module exists with the name `module_name`.
    """
    module = importlib.import_module(module_name)
    return getattr(module, object_name)

object_as_import(obj) 🔗

Returns the name and module of an object, which can be used for importing it.

Parameters:

Name Type Description Default
obj object

An arbitrary Python object.

required

Returns:

Type Description
Tuple(str, str)

Returns a tuple of the object name and the module name in which the object is defined.

Exceptions:

Type Description
ValueError

If obj doesn't have a name that can be directly imported, e.g. because it is defined within a local class.

Note

If obj is defined within the __main__ script, the function tries to determine a name for the __main__ module, under which it could be imported from other scripts.

Source code in bandsaw/modules.py
def object_as_import(obj):
    """
    Returns the name and module of an object, which can be used for importing it.

    Args:
        obj (object): An arbitrary Python object.

    Returns:
        Tuple(str, str): Returns a tuple of the object name and the module name in
            which the object is defined.

    Raises:
        ValueError: If `obj` doesn't have a name that can be directly imported, e.g.
            because it is defined within a local class.

    Note:
        If `obj` is defined within the `__main__` script, the function tries to
        determine a name for the `__main__` module, under which it could be imported
        from other scripts.

    """
    object_name = obj.__name__
    module_name = obj.__module__

    if module_name == '__main__':
        module_file_path = sys.modules['__main__'].__file__
        module_name = _guess_module_name_by_path(module_file_path, sys.path)

    if '<locals>' in obj.__qualname__:
        raise ValueError("Can't import local functions.")
    return object_name, module_name

result 🔗

Contains code for representing the result of tasks.

Result (SerializableValue) 🔗

Class to encapsulate the result of a task execution.

Attributes:

Name Type Description
value Any

The value that is returned by the task. None is the task raised an exception during execution.

exception Exception

The exception that was raised during execution, None if no exception was raised.

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/result.py
@classmethod
def deserialize(cls, values):
    value = values["value"]
    exception = values["exception"]
    return Result(value=value, exception=exception)

serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/result.py
def serialized(self):
    values = {
        "value": self.value,
        "exception": self.exception,
    }
    return values

run 🔗

Functions for managing the run id.

get_run_id() 🔗

Returns the run id.

The run id is a unique identifier that is specific to an individual run of a workflow. It stays the same across all task executions and can be used for tracking metrics and differentiating between different runs of the same workflow where task_id and run_id stay the same.

Returns:

Type Description
str

The unique run id.

Source code in bandsaw/run.py
def get_run_id():
    """
    Returns the run id.

    The run id is a unique identifier that is specific to an individual run of a
    workflow. It stays the same across all task executions and can be used for
    tracking metrics and differentiating between different runs of the same workflow
    where task_id and run_id stay the same.

    Returns:
        str: The unique run id.
    """
    if _RUN_ID is None:
        set_run_id(str(uuid.uuid1()))
    return _RUN_ID

set_run_id(run_id) 🔗

Sets the run id.

Setting the run id explicitly is usually not necessary. The function is mainly used when task executions are run in a different process to make sure the run id is consistent with the spawning process, but it can be used e.g. if an external system provides a unique identifier for a specific workflow run.

When set_run_id(run_id) is being used, it must be run before the first tasks are actually defined.

Exceptions:

Type Description
RuntimeError

If the run id was already set before.

Source code in bandsaw/run.py
def set_run_id(run_id):
    """
    Sets the run id.

    Setting the run id explicitly is usually not necessary. The function is mainly
    used when task executions are run in a different process to make sure the run id
    is consistent with the spawning process, but it can be used e.g. if an external
    system provides a unique identifier for a specific workflow run.

    When `set_run_id(run_id)` is being used, it must be run before the first tasks
    are actually defined.

    Raises:
        RuntimeError: If the run id was already set before.
    """
    global _RUN_ID  # pylint: disable=global-statement
    if _RUN_ID is not None:
        logger.error("run_id already set to %s when trying to set again", _RUN_ID)
        raise RuntimeError("Run ID was already set")
    logger.info("Set run_id to %s", run_id)
    _RUN_ID = run_id

runner 🔗

Contains main() function to continue sessions from files

main(args) 🔗

Main function that can be used for proceeding a session.

This function allows to read a session from a file, proceed it until it returns and then save the state of the session to a new file. It is used for running tasks in a separate process or on different machines.

Parameters:

Name Type Description Default
args tuple[str]

The arguments taken from the command line.

required
Source code in bandsaw/runner.py
def main(args):
    """
    Main function that can be used for proceeding a session.

    This function allows to read a session from a file, proceed it until it returns
    and then save the state of the session to a new file. It is used for running
    tasks in a separate process or on different machines.

    Args:
        args (tuple[str]): The arguments taken from the command line.
    """
    hostname = os.uname()[1]
    log_format = (
        f"{{asctime}} {hostname} {{process: >5d}} {{thread: >5d}} "
        f"{{name}} {{levelname}}: {{message}}"
    )
    logging.basicConfig(level=logging.INFO, format=log_format, style='{')

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input_session',
        help="The session which should be continued",
        required=True,
    )
    parser.add_argument(
        '--output',
        dest='output_session',
        help="The session after continuation ended",
        required=True,
    )
    parser.add_argument(
        '--run-id',
        dest='run_id',
        help="The run id of the workflow",
        required=True,
    )
    args = parser.parse_args(args=args)

    set_run_id(args.run_id)

    logger.info("Creating new session")
    session = Session()

    logger.info("Reading session from %s", args.output_session)
    with io.FileIO(args.input_session, mode='r') as stream:
        session.restore(stream)

    logger.info("Proceeding session")
    session.proceed()

    logger.info("Writing session with result to %s", args.output_session)
    with io.FileIO(args.output_session, mode='w') as stream:
        session.save(stream)

serialization special 🔗

Module for the package bandsaw.serialization. Contains all public types.

json 🔗

Contains a Serializer that allows to serialize objects to JSON.

JsonSerializer (Serializer) 🔗

A Serializer which serializes objects to JSON.

Attributes:

Name Type Description
value_serializers List[ValueSerializer]

A list of serializers that are used for serialization of custom types.

deserialize(self, stream) 🔗

Deserialize a value/object from a binary stream.

Parameters:

Name Type Description Default
stream io.Stream

The binary stream from where the serialized value is read.

required

Returns:

Type Description
Any

The object/value which was deserialized.

Source code in bandsaw/serialization/json.py
def deserialize(self, stream):
    """
    Deserialize a value/object from a binary stream.

    Args:
        stream (io.Stream): The binary stream from where the serialized value is
            read.

    Returns:
         Any: The object/value which was deserialized.
    """
    text_stream = io.TextIOWrapper(stream, encoding='utf-8')
    result = json.load(
        text_stream,
        cls=_ExtensibleJSONDecoder,
        value_serializers=self.value_serializers,
    )
    text_stream.detach()
    return result
serialize(self, value, stream) 🔗

Serialize a value/object into a binary stream.

Parameters:

Name Type Description Default
value Any

The object/value to be serialized.

required
stream io.Stream

The binary stream where the serialized value is written.

required
Source code in bandsaw/serialization/json.py
def serialize(self, value, stream):
    """
    Serialize a value/object into a binary stream.

    Args:
        value (Any): The object/value to be serialized.
        stream (io.Stream): The binary stream where the serialized value is
            written.
    """
    text_stream = io.TextIOWrapper(stream, encoding='utf-8')
    json.dump(
        value,
        text_stream,
        cls=_ExtensibleJSONEncoder,
        value_serializers=self.value_serializers,
        allow_nan=False,
        indent=None,
        separators=(',', ':'),
        sort_keys=True,
    )
    text_stream.detach()

pickle 🔗

Contains a Serializer which uses pickle for serializing values.

PickleSerializer (Serializer) 🔗

A Serializer which serializes objects using pickle.

deserialize(self, stream) 🔗

Deserialize a value/object from a binary stream.

Parameters:

Name Type Description Default
stream io.Stream

The binary stream from where the serialized value is read.

required

Returns:

Type Description
Any

The object/value which was deserialized.

Source code in bandsaw/serialization/pickle.py
def deserialize(self, stream):
    """
    Deserialize a value/object from a binary stream.

    Args:
        stream (io.Stream): The binary stream from where the serialized value is
            read.

    Returns:
         Any: The object/value which was deserialized.
    """
    return pickle.load(stream)
serialize(self, value, stream) 🔗

Serialize a value/object into a binary stream.

Parameters:

Name Type Description Default
value Any

The object/value to be serialized.

required
stream io.Stream

The binary stream where the serialized value is written.

required
Source code in bandsaw/serialization/pickle.py
def serialize(self, value, stream):
    """
    Serialize a value/object into a binary stream.

    Args:
        value (Any): The object/value to be serialized.
        stream (io.Stream): The binary stream where the serialized value is
            written.
    """
    pickle.dump(value, stream)

serializer 🔗

Base classes for serializers which allow to serialize python values.

Serializer (ABC) 🔗

Interface for Serializer which serialize objects

deserialize(self, stream) 🔗

Deserialize a value/object from a binary stream.

Parameters:

Name Type Description Default
stream io.Stream

The binary stream from where the serialized value is read.

required

Returns:

Type Description
Any

The object/value which was deserialized.

Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def deserialize(self, stream):
    """
    Deserialize a value/object from a binary stream.

    Args:
        stream (io.Stream): The binary stream from where the serialized value is
            read.

    Returns:
         Any: The object/value which was deserialized.
    """
serialize(self, value, stream) 🔗

Serialize a value/object into a binary stream.

Parameters:

Name Type Description Default
value Any

The object/value to be serialized.

required
stream io.Stream

The binary stream where the serialized value is written.

required
Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def serialize(self, value, stream):
    """
    Serialize a value/object into a binary stream.

    Args:
        value (Any): The object/value to be serialized.
        stream (io.Stream): The binary stream where the serialized value is
            written.
    """

values 🔗

A collection of classes for serializing custom objects.

ExceptionSerializer (ValueSerializer) 🔗

A ValueSerializer for serializing exceptions.

The serializer saves only the type and the args attribute of the exception, therefore it won't work for all exception types, but it should cover the most. Other attributes of the exception, e.g. stacktrace etc. are discarded.

can_serialize_value(self, value) 🔗

Returns if a serializer can serialize a specific value.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
boolean

True if this serializer can serialize the given value, otherwise False.

Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
    return isinstance(value, Exception)
deserialize_value(self, representation) 🔗

Returns a deserialized value from its serialized representation.

Parameters:

Name Type Description Default
representation Any

The serialized representation of the value.

required

Returns:

Type Description
Any

The deserialized value.

Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
    module_name = representation['module']
    type_name = representation['type']
    module = importlib.import_module(module_name)
    value_type = getattr(module, type_name)
    return value_type(*representation['args'])
serialize_value(self, value) 🔗

Returns a serialized representation of the given value.

The returned representation can use standard python types like primitive values, lists or dicts.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
Any

The serialized representation of the value.

Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
    state = {
        'type': type(value).__name__,
        'module': type(value).__module__,
        'args': value.args,
    }
    return state

SerializableValue (ABC) 🔗

Interface for types that can serialize themselves.

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/serialization/values.py
@classmethod
@abc.abstractmethod
def deserialize(cls, values):
    """Returns a new instance of a value from its serialized representation."""
serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialized(self):
    """Returns a serializable representation of the value."""

SerializableValueSerializer (ValueSerializer) 🔗

A ValueSerializer for serializing subclasses of SerializableValue.

The serializer uses the methods defined in SerializableValue and implemented by the individual classes to serialize values. It stores the type of the value and its serialized representation and allows to recreate the value from this information.

can_serialize_value(self, value) 🔗

Returns if a serializer can serialize a specific value.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
boolean

True if this serializer can serialize the given value, otherwise False.

Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
    return isinstance(value, SerializableValue)
deserialize_value(self, representation) 🔗

Returns a deserialized value from its serialized representation.

Parameters:

Name Type Description Default
representation Any

The serialized representation of the value.

required

Returns:

Type Description
Any

The deserialized value.

Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
    module_name = representation['module']
    type_name = representation['type']
    module = importlib.import_module(module_name)
    value_type = getattr(module, type_name)
    return value_type.deserialize(representation['serialized'])
serialize_value(self, value) 🔗

Returns a serialized representation of the given value.

The returned representation can use standard python types like primitive values, lists or dicts.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
Any

The serialized representation of the value.

Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
    state = {
        'type': type(value).__name__,
        'module': type(value).__module__,
        'serialized': value.serialized(),
    }
    return state

TupleSerializer (ValueSerializer) 🔗

A ValueSerializer for serializing tuples.

The serializer supports normal tuples as well as named tuples. When namedtuples are deserialized it first tries to reuse an existing namedtople type. If the type can't be imported or reused, a new namedtuple type with the same name and fields is created on the fly.

can_serialize_value(self, value) 🔗

Returns if a serializer can serialize a specific value.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
boolean

True if this serializer can serialize the given value, otherwise False.

Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
    return isinstance(value, tuple)
deserialize_value(self, representation) 🔗

Returns a deserialized value from its serialized representation.

Parameters:

Name Type Description Default
representation Any

The serialized representation of the value.

required

Returns:

Type Description
Any

The deserialized value.

Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
    if representation['type'] == 'namedtuple':
        # try to import the namedtuple type
        module_name = representation['module']
        type_name = representation['name']
        try:
            module = importlib.import_module(module_name)
            tuple_type = getattr(module, type_name)
        except (ImportError, AttributeError) as error:
            logger.warning(
                "Error importing namedtuple, trying to recreate it: %s", error
            )
            # Recreate a new type
            field_names = ' '.join(representation['fields'])
            tuple_type = collections.namedtuple(
                type_name, field_names, module=module_name
            )
        return tuple_type(*representation['items'])

    return tuple(representation['items'])
serialize_value(self, value) 🔗

Returns a serialized representation of the given value.

The returned representation can use standard python types like primitive values, lists or dicts.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
Any

The serialized representation of the value.

Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
    if hasattr(value, '_fields'):
        state = {
            'type': 'namedtuple',
            'fields': list(value._fields),
            'name': type(value).__name__,
            'module': type(value).__module__,
            'items': list(value),
        }
    else:
        state = {
            'type': 'tuple',
            'items': list(value),
        }

    return state

ValueSerializer (ABC) 🔗

Interface for serializers that can serialize custom values.

can_serialize_value(self, value) 🔗

Returns if a serializer can serialize a specific value.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
boolean

True if this serializer can serialize the given value, otherwise False.

Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def can_serialize_value(self, value):
    """
    Returns if a serializer can serialize a specific value.

    Args:
        value (Any): The value that should be serialized.

    Returns:
        boolean: `True` if this serializer can serialize the given value,
            otherwise `False`.
    """
deserialize_value(self, representation) 🔗

Returns a deserialized value from its serialized representation.

Parameters:

Name Type Description Default
representation Any

The serialized representation of the value.

required

Returns:

Type Description
Any

The deserialized value.

Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def deserialize_value(self, representation):
    """
    Returns a deserialized value from its serialized representation.

    Args:
        representation (Any): The serialized representation of the value.

    Returns:
        Any: The deserialized value.
    """
serialize_value(self, value) 🔗

Returns a serialized representation of the given value.

The returned representation can use standard python types like primitive values, lists or dicts.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
Any

The serialized representation of the value.

Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialize_value(self, value):
    """
    Returns a serialized representation of the given value.

    The returned representation can use standard python types like primitive
    values, lists or dicts.

    Args:
        value (Any): The value that should be serialized.

    Returns:
        Any: The serialized representation of the value.
    """

session 🔗

Contains classes for representing an advising session

Session 🔗

Class that handles the advising of an execution.

A Session object is given to the individual advices that are called to advise the execution. By calling the appropriate methods like def proceed(self) to continue or conclude() to end with a result, the advices can influence the final result. Additionally, the session provides access to the context, which allows advices to keep state, the execution that is advised, the configuration that is used for advising and the result of the execution.

Attributes:

Name Type Description
task bandsaw.tasks.Task

The task that is executed.

execution bandsaw.execution.Execution

The execution arguments for the task.

context bandsaw.context.Context

The context that can be used for advices to store state.

result bandsaw.result.Result

Result of the task if already computed. Otherwise None.

_configuration bandsaw.config.Configuration

The configuration that is being used for advising this task.

distribution_archive property readonly 🔗

The DistributionArchive which can be used when transferring the session.

run_id property readonly 🔗

The run id of the workflow.

serializer property readonly 🔗

The serializer that can be used for serializing values.

__init__(self, task=None, execution=None, configuration=None, advice_chain='default') special 🔗

Create a new session.

Source code in bandsaw/session.py
def __init__(
    self,
    task=None,
    execution=None,
    configuration=None,
    advice_chain='default',
):
    """
    Create a new session.


    """
    self.task = task
    self.execution = execution
    self.context = {}
    self.result = None
    self._configuration = configuration
    self._advice_chain = advice_chain
    self._moderator = None

conclude(self, result) 🔗

Conclude the process of advising with a Result.

This can be used in two cases:

  1. Concluding BEFORE the task was actually executed. This will skip all subsequent advices defined later in the advice chain and will skip the task execution. The given result will then be used as preliminary result. All advices that are defined before the calling advice in the advice chain will still be called with there after(session) method.

  2. Concluding AFTER the task was actually executed. This will just change the result of the session and continue will all following advices.

Parameters:

Name Type Description Default
result bandsaw.result.Result

The result to conclude with.

required
Source code in bandsaw/session.py
def conclude(self, result):
    """
    Conclude the process of advising with a `Result`.

    This can be used in two cases:

    1. Concluding BEFORE the task was actually executed. This will skip all
       subsequent advices defined later in the advice chain and will skip the
       task execution. The given `result` will then be used as preliminary result.
       All advices that are defined before the calling advice in the advice chain
       will still be called with there `after(session)` method.

    2. Concluding AFTER the task was actually executed. This will just change the
       `result` of the session and continue will all following advices.

    Args:
        result (bandsaw.result.Result): The result to conclude with.
    """
    self.result = result
    self._moderator.skip(self)

initiate(self) 🔗

Start the process of advising an execution.

Returns:

Type Description
bandsaw.result.Result

The final result of the execution after all advices.

Source code in bandsaw/session.py
def initiate(self):
    """
    Start the process of advising an execution.

    Returns:
        bandsaw.result.Result: The final result of the execution after all
            advices.
    """

    self._moderator = _Moderator(
        self._configuration.get_advice_chain(self._advice_chain)
    )

    logger.debug("running extensions before advice")
    for extension in self._configuration.extensions:
        extension.on_before_advice(self.task, self.execution, self.context)

    self.proceed()

    if not self._moderator.is_finished:
        raise RuntimeError(
            f"Not all advice has been applied. "
            f"Misbehaving advice {self._moderator.current_advice}"
        )

    logger.debug("running extensions after advice")
    for extension in self._configuration.extensions:
        extension.on_after_advice(
            self.task, self.execution, self.context, self.result
        )
    return self.result

proceed(self) 🔗

Continue the process of advising with the next advice.

Source code in bandsaw/session.py
def proceed(self):
    """
    Continue the process of advising with the next advice.
    """
    self._moderator.next(self)

restore(self, stream) 🔗

Resume a prior suspended session.

Source code in bandsaw/session.py
def restore(self, stream):
    """
    Resume a prior suspended session.
    """
    self._load_from_zip(stream)
    return self

save(self, stream) 🔗

Suspend the session to be resumed later or elsewhere.

Source code in bandsaw/session.py
def save(self, stream):
    """
    Suspend the session to be resumed later or elsewhere.
    """
    self._store_as_zip(stream)

tasks 🔗

Contains classes and functions representing different types of tasks

Task (SerializableValue, ABC) 🔗

Base-class for different types of Tasks that can be executed

Attributes:

Name Type Description
task_id str

A unique identifier for the individual tasks.

kwargs dict

A dictionary with additional arguments provided at task definition.

source str

The python source code as string which defines the task.

bytecode bytes

The compiled byte code of the task definition.

bytecode property readonly 🔗

The compiled byte code of the task definition as bytes.

kwargs property readonly 🔗

Additional arguments defined at task definition.

source property readonly 🔗

The python source code as str which defines the task.

create_task(obj, task_kwargs=None) classmethod 🔗

Factory for creating a task for different Python objects.

Parameters:

Name Type Description Default
obj Any

Python object that should be run as a task.

required
task_kwargs dict

A dictionary containing additional task arguments.

None

Returns:

Type Description
bandsaw.tasks.Task

Instance of Task class that allows to execute the task.

Exceptions:

Type Description
TypeError

If there is no support for this type of python object.

Source code in bandsaw/tasks.py
@classmethod
def create_task(cls, obj, task_kwargs=None):
    """
    Factory for creating a task for different Python objects.

    Args:
        obj (Any): Python object that should be run as a task.
        task_kwargs (dict): A dictionary containing additional task arguments.

    Returns:
        bandsaw.tasks.Task: Instance of `Task` class that allows to execute the
            task.

    Raises:
        TypeError: If there is no support for this type of python object.
    """
    if task_kwargs is None:
        task_kwargs = {}
    if isinstance(obj, types.FunctionType):
        if '.<locals>.' in obj.__qualname__:
            return _FunctionWithClosureTask(obj, task_kwargs)
        function_name, module_name = object_as_import(obj)
        return _FunctionTask(function_name, module_name, task_kwargs)
    raise TypeError(f"Unsupported task object of type {type(obj)}")

execute(self, execution) 🔗

Execute the task with the arguments specified by the execution.

Parameters:

Name Type Description Default
execution bandsaw.execution.Execution

The definition which contains how the task should be executed.

required

Returns:

Type Description
bandsaw.result.Result

A Result object with either the returned value from the task or an exception that was raised by the task.

Source code in bandsaw/tasks.py
def execute(self, execution):
    """
    Execute the task with the arguments specified by the execution.

    Args:
        execution (bandsaw.execution.Execution): The definition which contains how
            the task should be executed.

    Returns:
        bandsaw.result.Result: A `Result` object with either the returned value
            from the task or an exception that was raised by the task.
    """
    try:
        result_value = self._execute(execution.args, execution.kwargs)
        result = Result(value=result_value)
    except Exception as error:  # pylint: disable=W0703 # too general exception
        result = Result(exception=error)
    return result

user 🔗

Contains functions related to users

get_current_username() 🔗

Returns the name of the user which is currently running the python process.

Returns:

Type Description
str

The name of the user on the local system.

Source code in bandsaw/user.py
def get_current_username():
    """
    Returns the name of the user which is currently running the python process.

    Returns:
        str: The name of the user on the local system.
    """
    return pwd.getpwuid(os.getuid())[0]

Last update: 2021-10-16