Skip to content

API

A library for splitting python workflows into separate tasks

advice 🔗

Contains classes and functions for advising tasks.

Advice 🔗

Interface that needs to be implemented by an advice.

The interface is quite simple. One has to implement two different methods, before(session) and after(session), that are called during the process of advising a task execution. Both take a single argument session which contains an instance of the class Session. This object allows the individual advices to influence the task execution by changing the way the task is being called or making changes to the result.

Source code in bandsaw/advice.py
class Advice:
    """
    Interface that needs to be implemented by an advice.

    The interface is quite simple. One has to implement two different methods,
    `before(session)` and `after(session)`, that are called during the process of
    advising a task execution. Both take a single argument `session` which contains an
    instance of the class `Session`. This object allows the individual advices to
    influence the task execution by changing the way the task is being called or
    making changes to the result.
    """

    def before(self, session):  # pylint: disable=R0201 # no-self-use
        """
        Called before the task is actually executed.

        This methods allows the individual advice, to make changes to the way the
        task execution is later executed. In order to continue, the advice MUST either
        call `session.proceed()`, which will continue the process with the next
        advice in the advice chain, or call `session.conclude(result)` with a `Result`
        instance, which will skip the following advices and return without executing
        the task execution at all.

        The default implementation will just call `session.proceed()`.

        Args:
            session (bandsaw.session.Session): The session of the execution.
        """
        session.proceed()

    def after(self, session):  # pylint: disable=R0201 # no-self-use
        """
        Called after the task is actually executed.

        This methods allows the individual advice, to make changes to the result of
        the task execution. The result can be retrieved from the `session`.

        In order to continue, the advice MUST either call `session.proceed()`, which
        will continue the process with current `result` and the next advice in the
        advice chain, or call `session.conclude(result)` with a `Result` instance,
        which will set a different result and continue with it.

        The default implementation will just call `session.proceed()`.

        Args:
            session (bandsaw.session.Session): The session of the execution.
        """
        session.proceed()

after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advice.py
def after(self, session):  # pylint: disable=R0201 # no-self-use
    """
    Called after the task is actually executed.

    This methods allows the individual advice, to make changes to the result of
    the task execution. The result can be retrieved from the `session`.

    In order to continue, the advice MUST either call `session.proceed()`, which
    will continue the process with current `result` and the next advice in the
    advice chain, or call `session.conclude(result)` with a `Result` instance,
    which will set a different result and continue with it.

    The default implementation will just call `session.proceed()`.

    Args:
        session (bandsaw.session.Session): The session of the execution.
    """
    session.proceed()

before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advice.py
def before(self, session):  # pylint: disable=R0201 # no-self-use
    """
    Called before the task is actually executed.

    This methods allows the individual advice, to make changes to the way the
    task execution is later executed. In order to continue, the advice MUST either
    call `session.proceed()`, which will continue the process with the next
    advice in the advice chain, or call `session.conclude(result)` with a `Result`
    instance, which will skip the following advices and return without executing
    the task execution at all.

    The default implementation will just call `session.proceed()`.

    Args:
        session (bandsaw.session.Session): The session of the execution.
    """
    session.proceed()

advise_task_with_chain(task, execution, configuration, advice_chain='default') 🔗

Executes an Task with additional advices.

Parameters:

Name Type Description Default
task bandsaw.tasks.Task

The task to be executed.

required
execution bandsaw.execution.Execution

The execution definition for the task.

required
configuration bandsaw.config.Configuration

The configuration which should be used during advising.

required
advice_chain str

The name of the advice chain which contains the additional advices to be applied to the task. Defaults to 'default'.

'default'

Returns:

Type Description
bandsaw.result.Result

The result of the task execution.

Source code in bandsaw/advice.py
def advise_task_with_chain(task, execution, configuration, advice_chain='default'):
    """
    Executes an `Task` with additional advices.

    Args:
        task (bandsaw.tasks.Task): The task to be executed.
        execution (bandsaw.execution.Execution): The execution definition for the task.
        configuration (bandsaw.config.Configuration): The configuration which should
            be used during advising.
        advice_chain (str): The name of the advice chain which contains the additional
            advices to be applied to the task. Defaults to 'default'.

    Returns:
        bandsaw.result.Result: The result of the task execution.

    """
    session = Session(task, execution, configuration, advice_chain)
    return session.initiate()

advices special 🔗

Package that contains reusable Advice classes

cache 🔗

Contains Advice that can cache task results in a local file system.

CachingAdvice (Advice) 🔗

Advice that caches results in a local filesystem.

Attributes:

Name Type Description
directory Path

The path to the directory where the results are cached.

Source code in bandsaw/advices/cache.py
class CachingAdvice(Advice):
    """
    Advice that caches results in a local filesystem.

    Attributes:
        directory (Path): The path to the directory where the results are cached.
    """

    def __init__(self, directory):
        self.directory = pathlib.Path(directory)
        logger.info("Caching artifacts in storage '%s'", self.directory)
        super().__init__()

    def before(self, session):
        artifact_id = session.task.task_id
        revision_id = session.execution.execution_id

        cache_item_path = self.directory / artifact_id / revision_id
        session.context['cache-item-path'] = str(cache_item_path)
        if cache_item_path.exists():
            logger.info("Using result from cache '%s'", cache_item_path)

            with open(cache_item_path, 'rb') as stream:
                result = session.serializer.deserialize(stream)
            session.conclude(result)
            return
        session.proceed()

    def after(self, session):
        cache_item_path = pathlib.Path(session.context['cache-item-path'])
        if not cache_item_path.exists():
            cache_item_directory = cache_item_path.parent
            if not cache_item_directory.exists():
                cache_item_directory.mkdir(parents=True)

            logger.info("Storing result in cache '%s'", cache_item_path)

            with open(cache_item_path, 'wb') as stream:
                session.serializer.serialize(session.result, stream)
        session.proceed()
after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/cache.py
def after(self, session):
    cache_item_path = pathlib.Path(session.context['cache-item-path'])
    if not cache_item_path.exists():
        cache_item_directory = cache_item_path.parent
        if not cache_item_directory.exists():
            cache_item_directory.mkdir(parents=True)

        logger.info("Storing result in cache '%s'", cache_item_path)

        with open(cache_item_path, 'wb') as stream:
            session.serializer.serialize(session.result, stream)
    session.proceed()
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/cache.py
def before(self, session):
    artifact_id = session.task.task_id
    revision_id = session.execution.execution_id

    cache_item_path = self.directory / artifact_id / revision_id
    session.context['cache-item-path'] = str(cache_item_path)
    if cache_item_path.exists():
        logger.info("Using result from cache '%s'", cache_item_path)

        with open(cache_item_path, 'rb') as stream:
            result = session.serializer.deserialize(stream)
        session.conclude(result)
        return
    session.proceed()

log 🔗

Contains an Advice implementation which adds logging

JsonFormatter (Formatter) 🔗

Formatter that formats log records into a JSON string.

Source code in bandsaw/advices/log.py
class JsonFormatter(logging.Formatter):
    """
    Formatter that formats log records into a JSON string.
    """

    def format(self, record):
        timestamp = datetime.datetime.fromtimestamp(
            record.created,
            datetime.timezone.utc,
        )

        log_item = {
            "timestamp": datetime.datetime.isoformat(timestamp),
            "logger": record.name,
            "level": record.levelname,
            "message": record.message,
            "threadId": record.thread,
            "threadName": record.threadName,
            "processId": record.process,
            "processName": record.processName,
            "module": record.module,
            "function": record.funcName,
            "path": record.pathname,
            "line_no": record.lineno,
        }

        if record.exc_info is not None:
            log_item.update(
                {
                    'traceback': ''.join(
                        traceback.format_tb(record.exc_info[2])
                    ).strip(),
                    'exception': traceback.format_exception_only(*record.exc_info[:2])[
                        0
                    ].strip(),
                }
            )

        if hasattr(record, 'session'):
            log_item.update(
                {
                    'sessionId': record.session.session_id,
                    'runId': record.session.run_id,
                    'taskId': record.session.task.task_id,
                    'executionId': record.session.execution.execution_id,
                }
            )

        return json.dumps(log_item)
format(self, record) 🔗

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Source code in bandsaw/advices/log.py
def format(self, record):
    timestamp = datetime.datetime.fromtimestamp(
        record.created,
        datetime.timezone.utc,
    )

    log_item = {
        "timestamp": datetime.datetime.isoformat(timestamp),
        "logger": record.name,
        "level": record.levelname,
        "message": record.message,
        "threadId": record.thread,
        "threadName": record.threadName,
        "processId": record.process,
        "processName": record.processName,
        "module": record.module,
        "function": record.funcName,
        "path": record.pathname,
        "line_no": record.lineno,
    }

    if record.exc_info is not None:
        log_item.update(
            {
                'traceback': ''.join(
                    traceback.format_tb(record.exc_info[2])
                ).strip(),
                'exception': traceback.format_exception_only(*record.exc_info[:2])[
                    0
                ].strip(),
            }
        )

    if hasattr(record, 'session'):
        log_item.update(
            {
                'sessionId': record.session.session_id,
                'runId': record.session.run_id,
                'taskId': record.session.task.task_id,
                'executionId': record.session.execution.execution_id,
            }
        )

    return json.dumps(log_item)

LoggingAdvice (Advice) 🔗

An Advice which adds additional logging

Source code in bandsaw/advices/log.py
class LoggingAdvice(Advice):
    """An Advice which adds additional logging"""

    def __init__(self, level=None, formatter=None):
        """
        Create a new instance of the `LoggingAdvice`.

        Args:
            level (int): The log level of the messages to keep. If `None` the level is
                defined by the root logger. Defaults to `None`.
            formatter (logging.Formatter): Formatter to use for writing out the
                individual log messages. Defaults to `JsonFormatter`.
        """
        self._level = level
        if formatter is None:
            formatter = JsonFormatter()
        self._formatter = formatter

    def before(self, session):
        session_log_file_path = session.temp_dir / 'session.log'
        file_handler = logging.FileHandler(
            filename=str(session_log_file_path.absolute()),
        )
        file_handler.set_name('Handler-' + session.session_id)
        if self._level is not None:
            file_handler.setLevel(self._level)
        file_handler.setFormatter(self._formatter)
        session_filter = _SessionFilter(session)
        file_handler.addFilter(session_filter)
        logging.root.addHandler(file_handler)

        logger.info(
            "BEFORE %s:%s with context %s",
            session.task.task_id,
            session.execution.execution_id,
            session.context,
        )
        session.proceed()

    def after(self, session):
        logger.info(
            "AFTER %s:%s with context %s",
            session.task.task_id,
            session.execution.execution_id,
            session.context,
        )
        for handler in logging.root.handlers:
            if handler.get_name() == 'Handler-' + session.session_id:
                logging.root.removeHandler(handler)
                handler.flush()
        session.attachments['session.log'] = session.temp_dir / 'session.log'
        session.proceed()
__init__(self, level=None, formatter=None) special 🔗

Create a new instance of the LoggingAdvice.

Parameters:

Name Type Description Default
level int

The log level of the messages to keep. If None the level is defined by the root logger. Defaults to None.

None
formatter logging.Formatter

Formatter to use for writing out the individual log messages. Defaults to JsonFormatter.

None
Source code in bandsaw/advices/log.py
def __init__(self, level=None, formatter=None):
    """
    Create a new instance of the `LoggingAdvice`.

    Args:
        level (int): The log level of the messages to keep. If `None` the level is
            defined by the root logger. Defaults to `None`.
        formatter (logging.Formatter): Formatter to use for writing out the
            individual log messages. Defaults to `JsonFormatter`.
    """
    self._level = level
    if formatter is None:
        formatter = JsonFormatter()
    self._formatter = formatter
after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/log.py
def after(self, session):
    logger.info(
        "AFTER %s:%s with context %s",
        session.task.task_id,
        session.execution.execution_id,
        session.context,
    )
    for handler in logging.root.handlers:
        if handler.get_name() == 'Handler-' + session.session_id:
            logging.root.removeHandler(handler)
            handler.flush()
    session.attachments['session.log'] = session.temp_dir / 'session.log'
    session.proceed()
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/log.py
def before(self, session):
    session_log_file_path = session.temp_dir / 'session.log'
    file_handler = logging.FileHandler(
        filename=str(session_log_file_path.absolute()),
    )
    file_handler.set_name('Handler-' + session.session_id)
    if self._level is not None:
        file_handler.setLevel(self._level)
    file_handler.setFormatter(self._formatter)
    session_filter = _SessionFilter(session)
    file_handler.addFilter(session_filter)
    logging.root.addHandler(file_handler)

    logger.info(
        "BEFORE %s:%s with context %s",
        session.task.task_id,
        session.execution.execution_id,
        session.context,
    )
    session.proceed()

metrics 🔗

Contains an Advice implementation which gathers metrics.

MetricsAdvice (Advice) 🔗

An Advice which gathers metrics.

Underneath this advice uses the python multimeter library for collecting the metrics.

Source code in bandsaw/advices/metrics.py
class MetricsAdvice(Advice):
    """An Advice which gathers metrics.

    Underneath this advice uses the python `multimeter` library for collecting the
    metrics.
    """

    def __init__(self, meter, directory=None, file_format=multimeter.JsonFormat()):
        """
        Creates a new MetricsAdvice that gathers metrics.

        Args:
            meter (multimeter.Multimeter): The Multimeter instance which is used for
                gathering the metrics.
            directory (str): Path to a directory, where the metrics are temporarily
                stored. If `None` or omitted, the session temporary directory is used.
            file_format (multimeter.FileFormat): File format that defines the format
                in which the gathered metrics are stored. Defaults to
                `multimeter.JsonFormat`.
        """
        self._multimeter = meter
        if directory is not None:
            self._directory = pathlib.Path(directory)
        else:
            self._directory = None
        self._file_format = file_format

    def before(self, session):
        tags = {
            'run_id': session.run_id,
            'task_id': session.task.task_id,
            'execution_id': session.execution.execution_id,
            'session_id': session.session_id,
        }
        advice_parameters = session.task.advice_parameters
        additional_tags = advice_parameters.get('metrics', {}).get('tags', {})
        tags.update(additional_tags)

        logger.info("Measurement id %s with tags %s", session.session_id, tags)
        measurement = self._multimeter.measure(session.session_id, **tags)
        session.context['metrics.measurement'] = measurement

        logger.debug("Measurement start")
        measurement.start()
        session.proceed()

    def after(self, session):
        measurement = session.context.pop('metrics.measurement')
        logger.debug("Measurement end")
        measurement.end()

        directory = self._directory or session.temp_dir
        storage = multimeter.FileStorage(directory, self._file_format)
        storage.store(measurement.result)
        metrics_file_name = measurement.identifier + self._file_format.extension
        metrics_attachment_name = 'metrics' + self._file_format.extension
        session.attachments[metrics_attachment_name] = directory / metrics_file_name
        session.proceed()
__init__(self, meter, directory=None, file_format=<multimeter.storages.file.JsonFormat object at 0x7f8a8f98df10>) special 🔗

Creates a new MetricsAdvice that gathers metrics.

Parameters:

Name Type Description Default
meter multimeter.Multimeter

The Multimeter instance which is used for gathering the metrics.

required
directory str

Path to a directory, where the metrics are temporarily stored. If None or omitted, the session temporary directory is used.

None
file_format multimeter.FileFormat

File format that defines the format in which the gathered metrics are stored. Defaults to multimeter.JsonFormat.

<multimeter.storages.file.JsonFormat object at 0x7f8a8f98df10>
Source code in bandsaw/advices/metrics.py
def __init__(self, meter, directory=None, file_format=multimeter.JsonFormat()):
    """
    Creates a new MetricsAdvice that gathers metrics.

    Args:
        meter (multimeter.Multimeter): The Multimeter instance which is used for
            gathering the metrics.
        directory (str): Path to a directory, where the metrics are temporarily
            stored. If `None` or omitted, the session temporary directory is used.
        file_format (multimeter.FileFormat): File format that defines the format
            in which the gathered metrics are stored. Defaults to
            `multimeter.JsonFormat`.
    """
    self._multimeter = meter
    if directory is not None:
        self._directory = pathlib.Path(directory)
    else:
        self._directory = None
    self._file_format = file_format
after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/metrics.py
def after(self, session):
    measurement = session.context.pop('metrics.measurement')
    logger.debug("Measurement end")
    measurement.end()

    directory = self._directory or session.temp_dir
    storage = multimeter.FileStorage(directory, self._file_format)
    storage.store(measurement.result)
    metrics_file_name = measurement.identifier + self._file_format.extension
    metrics_attachment_name = 'metrics' + self._file_format.extension
    session.attachments[metrics_attachment_name] = directory / metrics_file_name
    session.proceed()
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/metrics.py
def before(self, session):
    tags = {
        'run_id': session.run_id,
        'task_id': session.task.task_id,
        'execution_id': session.execution.execution_id,
        'session_id': session.session_id,
    }
    advice_parameters = session.task.advice_parameters
    additional_tags = advice_parameters.get('metrics', {}).get('tags', {})
    tags.update(additional_tags)

    logger.info("Measurement id %s with tags %s", session.session_id, tags)
    measurement = self._multimeter.measure(session.session_id, **tags)
    session.context['metrics.measurement'] = measurement

    logger.debug("Measurement start")
    measurement.start()
    session.proceed()

ssh 🔗

Contains code for running tasks remotely via SSH

Remote 🔗

Definition of a remote machine.

Remotes are used for executing sessions on remote machines.

Attributes:

Name Type Description
host str

The hostname of the machine, where this interpreter is run.

port int

The port where ssh is listening for connections. Defaults to 22.

user str

The name of the user, under which the interpreter will run. Defaults to the name of the local user.

key_file str

Path to a file containing the key to use for authentication.

interpreter bandsaw.interpreter.Interpreter

The interpreter which should be used, including its executable and python path.

directory str

Remote directory where temporary files are stored. If None, defaults to '/tmp'.

Source code in bandsaw/advices/ssh.py
class Remote:  # pylint: disable=too-few-public-methods
    """
    Definition of a remote machine.

    Remotes are used for executing sessions on remote machines.

    Attributes:
        host (str): The hostname of the machine, where this interpreter is run.
        port (int): The port where ssh is listening for connections. Defaults to 22.
        user (str): The name of the user, under which the interpreter will run.
            Defaults to the name of the local user.
        key_file (str): Path to a file containing the key to use for authentication.
        interpreter (bandsaw.interpreter.Interpreter): The interpreter which should be
            used, including its executable and python path.
        directory (str): Remote directory where temporary files are stored. If `None`,
            defaults to '/tmp'.
    """

    def __init__(
        self,
        host=None,
        port=None,
        user=None,
        key_file=None,
        interpreter=None,
        directory=None,
    ):  # pylint: disable=too-many-arguments
        if host is None:
            raise ValueError("Remote needs a host, `None` given.")
        self.host = host
        self.port = port or 22
        self.user = user or get_current_username()
        self.key_file = key_file
        if interpreter is None:
            raise ValueError("Remote needs an interpreter, `None` given.")
        self.interpreter = interpreter
        self.directory = pathlib.Path(directory or '/tmp')

SshAdvice (Advice) 🔗

Advice that moves and proceeds a session on a remote machine via SSH

Source code in bandsaw/advices/ssh.py
class SshAdvice(Advice):
    """Advice that moves and proceeds a session on a remote machine via SSH"""

    def __init__(self, directory=None, backend=SshCommandLineBackend()):
        """
        Create a new instance.

        Args:
            directory (str): The local directory where temporary files are stored to
                exchange data between the local and the remote machine. If `None`, the
                temporary directory from the session is used.
        """
        if directory is None:
            self.directory = None
            logger.info("Using session temporary directory for exchange data")
        else:
            self.directory = pathlib.Path(directory)
            logger.info("Using directory %s for exchange data", self.directory)

        self.remotes = {}
        self._backend = backend
        super().__init__()

    def add_remote(self, remote, name='default'):
        """
        Add a new definition of a remote machine.

        Args:
            remote (bandsaw.advices.ssh.Remote): Definition of the remote machine.
            name (str): Name of the remote. Defaults to `default`.

        Returns:
            bandsaw.advices.ssh.SshAdvice: The advice with the added remote.
        """
        self.remotes[name] = remote
        return self

    def before(self, session):

        temp_dir = self.directory or session.temp_dir

        session_in_path = temp_dir / f'session-{session.session_id}-in.zip'
        session_out_path = temp_dir / f'session-{session.session_id}-out.zip'

        logger.info("Writing session to %s", session_in_path)
        with io.FileIO(str(session_in_path), mode='w') as stream:
            session.save(stream)

        parameters = session.task.advice_parameters
        remote_name = parameters.get('ssh', {}).get('remote', 'default')
        remote = self.remotes[remote_name]

        remote_run_directory = remote.directory / session.execution.execution_id
        logger.info(
            "Creating run directory %s on host %s",
            remote_run_directory,
            remote.host,
        )
        self._backend.create_dir(
            remote,
            remote_run_directory,
        )

        logger.info("Copying over distribution archive to host %s", remote.host)
        distribution_archive_path = session.distribution_archive.path
        remote_distribution_archive_path = (
            remote_run_directory / distribution_archive_path.name
        )
        self._backend.copy_file_to_remote(
            remote,
            distribution_archive_path,
            remote_distribution_archive_path,
        )

        logger.info("Copying over session to host %s", remote.host)
        remote_session_in_path = remote_run_directory / session_in_path.name
        self._backend.copy_file_to_remote(
            remote,
            session_in_path,
            remote_session_in_path,
        )

        remote_session_out_path = remote_run_directory / session_out_path.name

        logger.info(
            "Running remote process using interpreter %s", remote.interpreter.executable
        )
        self._backend.execute_remote(
            remote,
            remote.interpreter.executable,
            str(remote_distribution_archive_path),
            '--input',
            str(remote_session_in_path),
            '--output',
            str(remote_session_out_path),
            '--run-id',
            session.run_id,
        )
        # environment = self.interpreter.environment
        # environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
        logger.info("Remote process exited")

        logger.info("Copying over session result from host %s", remote.host)
        self._backend.copy_file_from_remote(
            remote,
            remote_session_out_path,
            session_out_path,
        )

        logger.info(
            "Cleaning up remote directory %s on host %s",
            remote_run_directory,
            remote.host,
        )
        self._backend.delete_dir(remote, remote_run_directory)

        logger.info("Restore local session from %s", session_out_path)
        stream = io.FileIO(str(session_out_path), mode='r')
        session.restore(stream)

        logger.info(
            "Cleaning up local sessions %s, %s",
            session_in_path,
            session_out_path,
        )
        session_in_path.unlink()
        session_out_path.unlink()

        logger.info("Proceed local session")
        session.proceed()

    def after(self, session):
        logger.info("after called in process %d", os.getpid())
        logger.info("Remote process created result %s", session.result)
        logger.info("Returning to end remote session and continue in parent")
__init__(self, directory=None, backend=<bandsaw.advices.ssh.SshCommandLineBackend object at 0x7f8a8f968250>) special 🔗

Create a new instance.

Parameters:

Name Type Description Default
directory str

The local directory where temporary files are stored to exchange data between the local and the remote machine. If None, the temporary directory from the session is used.

None
Source code in bandsaw/advices/ssh.py
def __init__(self, directory=None, backend=SshCommandLineBackend()):
    """
    Create a new instance.

    Args:
        directory (str): The local directory where temporary files are stored to
            exchange data between the local and the remote machine. If `None`, the
            temporary directory from the session is used.
    """
    if directory is None:
        self.directory = None
        logger.info("Using session temporary directory for exchange data")
    else:
        self.directory = pathlib.Path(directory)
        logger.info("Using directory %s for exchange data", self.directory)

    self.remotes = {}
    self._backend = backend
    super().__init__()
add_remote(self, remote, name='default') 🔗

Add a new definition of a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Definition of the remote machine.

required
name str

Name of the remote. Defaults to default.

'default'

Returns:

Type Description
bandsaw.advices.ssh.SshAdvice

The advice with the added remote.

Source code in bandsaw/advices/ssh.py
def add_remote(self, remote, name='default'):
    """
    Add a new definition of a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Definition of the remote machine.
        name (str): Name of the remote. Defaults to `default`.

    Returns:
        bandsaw.advices.ssh.SshAdvice: The advice with the added remote.
    """
    self.remotes[name] = remote
    return self
after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/ssh.py
def after(self, session):
    logger.info("after called in process %d", os.getpid())
    logger.info("Remote process created result %s", session.result)
    logger.info("Returning to end remote session and continue in parent")
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/ssh.py
def before(self, session):

    temp_dir = self.directory or session.temp_dir

    session_in_path = temp_dir / f'session-{session.session_id}-in.zip'
    session_out_path = temp_dir / f'session-{session.session_id}-out.zip'

    logger.info("Writing session to %s", session_in_path)
    with io.FileIO(str(session_in_path), mode='w') as stream:
        session.save(stream)

    parameters = session.task.advice_parameters
    remote_name = parameters.get('ssh', {}).get('remote', 'default')
    remote = self.remotes[remote_name]

    remote_run_directory = remote.directory / session.execution.execution_id
    logger.info(
        "Creating run directory %s on host %s",
        remote_run_directory,
        remote.host,
    )
    self._backend.create_dir(
        remote,
        remote_run_directory,
    )

    logger.info("Copying over distribution archive to host %s", remote.host)
    distribution_archive_path = session.distribution_archive.path
    remote_distribution_archive_path = (
        remote_run_directory / distribution_archive_path.name
    )
    self._backend.copy_file_to_remote(
        remote,
        distribution_archive_path,
        remote_distribution_archive_path,
    )

    logger.info("Copying over session to host %s", remote.host)
    remote_session_in_path = remote_run_directory / session_in_path.name
    self._backend.copy_file_to_remote(
        remote,
        session_in_path,
        remote_session_in_path,
    )

    remote_session_out_path = remote_run_directory / session_out_path.name

    logger.info(
        "Running remote process using interpreter %s", remote.interpreter.executable
    )
    self._backend.execute_remote(
        remote,
        remote.interpreter.executable,
        str(remote_distribution_archive_path),
        '--input',
        str(remote_session_in_path),
        '--output',
        str(remote_session_out_path),
        '--run-id',
        session.run_id,
    )
    # environment = self.interpreter.environment
    # environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
    logger.info("Remote process exited")

    logger.info("Copying over session result from host %s", remote.host)
    self._backend.copy_file_from_remote(
        remote,
        remote_session_out_path,
        session_out_path,
    )

    logger.info(
        "Cleaning up remote directory %s on host %s",
        remote_run_directory,
        remote.host,
    )
    self._backend.delete_dir(remote, remote_run_directory)

    logger.info("Restore local session from %s", session_out_path)
    stream = io.FileIO(str(session_out_path), mode='r')
    session.restore(stream)

    logger.info(
        "Cleaning up local sessions %s, %s",
        session_in_path,
        session_out_path,
    )
    session_in_path.unlink()
    session_out_path.unlink()

    logger.info("Proceed local session")
    session.proceed()

SshBackend (ABC) 🔗

Interface definition for different SSH backends.

Source code in bandsaw/advices/ssh.py
class SshBackend(abc.ABC):
    """
    Interface definition for different SSH backends.
    """

    def create_dir(self, remote, remote_path):
        """
        Create a directory on a remote machine.

        Args:
            remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
                be created.
            remote_path (str): Remote path to the directory that should be created.
        """

    def copy_file_to_remote(self, remote, local_path, remote_path):
        """
        Copies a local file or directory to a remote machine.

        Args:
            remote (bandsaw.advices.ssh.Remote): Remote machine to which a file should
                be copied.
            local_path (pathlib.Path): Local path to the file which should be copied
                over.
            remote_path (pathlib.Path): Remote path of the file where it should be
                copied to.
        """

    def copy_file_from_remote(self, remote, remote_path, local_path):
        """
        Copies a remote file or directory to the local file system.

        Args:
            remote (bandsaw.advices.ssh.Remote): Remote machine from which a file
                should be copied.
            remote_path (pathlib.Path): Remote path of the file which should be
                copied.
            local_path (pathlib.Path): Local path to the file where it should be
                copied to.
        """

    def execute_remote(self, remote, executable, *arguments):
        """
        Executes an executable on a remote machine.

        Args:
            remote (bandsaw.advices.ssh.Remote): Remote machine where `executable`
                will be executed.
            executable (str): Remote path of the executable which should be executed.
            *arguments (str): Positional arguments which are the command line
                parameter for the `executable`.

        Raises:
            subprocess.CalledProcessError: If the remote process ends with an error.
                Its return code will be available through the exception.
        """

    def delete_dir(self, remote, remote_path):
        """
        Delete a directory on a remote machine.

        Args:
            remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
                be deleted.
            remote_path (str): Remote path to the directory that should be deleted.
        """
copy_file_from_remote(self, remote, remote_path, local_path) 🔗

Copies a remote file or directory to the local file system.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine from which a file should be copied.

required
remote_path pathlib.Path

Remote path of the file which should be copied.

required
local_path pathlib.Path

Local path to the file where it should be copied to.

required
Source code in bandsaw/advices/ssh.py
def copy_file_from_remote(self, remote, remote_path, local_path):
    """
    Copies a remote file or directory to the local file system.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine from which a file
            should be copied.
        remote_path (pathlib.Path): Remote path of the file which should be
            copied.
        local_path (pathlib.Path): Local path to the file where it should be
            copied to.
    """
copy_file_to_remote(self, remote, local_path, remote_path) 🔗

Copies a local file or directory to a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine to which a file should be copied.

required
local_path pathlib.Path

Local path to the file which should be copied over.

required
remote_path pathlib.Path

Remote path of the file where it should be copied to.

required
Source code in bandsaw/advices/ssh.py
def copy_file_to_remote(self, remote, local_path, remote_path):
    """
    Copies a local file or directory to a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine to which a file should
            be copied.
        local_path (pathlib.Path): Local path to the file which should be copied
            over.
        remote_path (pathlib.Path): Remote path of the file where it should be
            copied to.
    """
create_dir(self, remote, remote_path) 🔗

Create a directory on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where a directory will be created.

required
remote_path str

Remote path to the directory that should be created.

required
Source code in bandsaw/advices/ssh.py
def create_dir(self, remote, remote_path):
    """
    Create a directory on a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
            be created.
        remote_path (str): Remote path to the directory that should be created.
    """
delete_dir(self, remote, remote_path) 🔗

Delete a directory on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where a directory will be deleted.

required
remote_path str

Remote path to the directory that should be deleted.

required
Source code in bandsaw/advices/ssh.py
def delete_dir(self, remote, remote_path):
    """
    Delete a directory on a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
            be deleted.
        remote_path (str): Remote path to the directory that should be deleted.
    """
execute_remote(self, remote, executable, *arguments) 🔗

Executes an executable on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where executable will be executed.

required
executable str

Remote path of the executable which should be executed.

required
*arguments str

Positional arguments which are the command line parameter for the executable.

()

Exceptions:

Type Description
subprocess.CalledProcessError

If the remote process ends with an error. Its return code will be available through the exception.

Source code in bandsaw/advices/ssh.py
def execute_remote(self, remote, executable, *arguments):
    """
    Executes an executable on a remote machine.

    Args:
        remote (bandsaw.advices.ssh.Remote): Remote machine where `executable`
            will be executed.
        executable (str): Remote path of the executable which should be executed.
        *arguments (str): Positional arguments which are the command line
            parameter for the `executable`.

    Raises:
        subprocess.CalledProcessError: If the remote process ends with an error.
            Its return code will be available through the exception.
    """

SshCommandLineBackend (SshBackend) 🔗

SSH backend that uses the SSH command line tools.

Source code in bandsaw/advices/ssh.py
class SshCommandLineBackend(SshBackend):
    """
    SSH backend that uses the SSH command line tools.
    """

    def create_dir(self, remote, remote_path):
        return self._run(
            [
                'ssh',
                '-p',
                str(remote.port),
            ]
            + self._key_file_option(remote)
            + [
                self.login(remote),
                'mkdir',
                '-p',
                str(remote_path),
            ]
        )

    def copy_file_to_remote(self, remote, local_path, remote_path):
        copy_destination = self.get_remote_path(remote, remote_path)
        self._run(
            [
                'scp',
                '-P',
                str(remote.port),
            ]
            + self._key_file_option(remote)
            + [
                str(local_path),
                copy_destination,
            ],
        )

    def copy_file_from_remote(self, remote, remote_path, local_path):
        copy_source = self.get_remote_path(remote, remote_path)
        self._run(
            [
                'scp',
                '-P',
                str(remote.port),
            ]
            + self._key_file_option(remote)
            + [
                copy_source,
                str(local_path),
            ],
        )

    def execute_remote(self, remote, executable, *arguments):
        return self._run(
            [
                'ssh',
                '-p',
                str(remote.port),
            ]
            + self._key_file_option(remote)
            + [
                self.login(remote),
                str(executable),
            ]
            + list(arguments),
        )

    def delete_dir(self, remote, remote_path):
        return self._run(
            [
                'ssh',
                '-p',
                str(remote.port),
            ]
            + self._key_file_option(remote)
            + [
                self.login(remote),
                'rm',
                '-Rf',
                str(remote_path),
            ]
        )

    @staticmethod
    def _run(command):
        logger.debug("running command %s", command)
        subprocess.check_call(command)

    @staticmethod
    def login(remote):
        """Returns the destination of the remote in form of <user>@<host>"""
        return f"{remote.user}@{remote.host}"

    def get_remote_path(self, remote, path):
        """Returns the remote path in form of <user>@<host>:<path>"""
        return f"{self.login(remote)}:{path.absolute()}"

    @staticmethod
    def _key_file_option(remote):
        if remote.key_file is not None:
            return [
                '-i',
                remote.key_file,
            ]
        return []
copy_file_from_remote(self, remote, remote_path, local_path) 🔗

Copies a remote file or directory to the local file system.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine from which a file should be copied.

required
remote_path pathlib.Path

Remote path of the file which should be copied.

required
local_path pathlib.Path

Local path to the file where it should be copied to.

required
Source code in bandsaw/advices/ssh.py
def copy_file_from_remote(self, remote, remote_path, local_path):
    copy_source = self.get_remote_path(remote, remote_path)
    self._run(
        [
            'scp',
            '-P',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            copy_source,
            str(local_path),
        ],
    )
copy_file_to_remote(self, remote, local_path, remote_path) 🔗

Copies a local file or directory to a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine to which a file should be copied.

required
local_path pathlib.Path

Local path to the file which should be copied over.

required
remote_path pathlib.Path

Remote path of the file where it should be copied to.

required
Source code in bandsaw/advices/ssh.py
def copy_file_to_remote(self, remote, local_path, remote_path):
    copy_destination = self.get_remote_path(remote, remote_path)
    self._run(
        [
            'scp',
            '-P',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            str(local_path),
            copy_destination,
        ],
    )
create_dir(self, remote, remote_path) 🔗

Create a directory on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where a directory will be created.

required
remote_path str

Remote path to the directory that should be created.

required
Source code in bandsaw/advices/ssh.py
def create_dir(self, remote, remote_path):
    return self._run(
        [
            'ssh',
            '-p',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            self.login(remote),
            'mkdir',
            '-p',
            str(remote_path),
        ]
    )
delete_dir(self, remote, remote_path) 🔗

Delete a directory on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where a directory will be deleted.

required
remote_path str

Remote path to the directory that should be deleted.

required
Source code in bandsaw/advices/ssh.py
def delete_dir(self, remote, remote_path):
    return self._run(
        [
            'ssh',
            '-p',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            self.login(remote),
            'rm',
            '-Rf',
            str(remote_path),
        ]
    )
execute_remote(self, remote, executable, *arguments) 🔗

Executes an executable on a remote machine.

Parameters:

Name Type Description Default
remote bandsaw.advices.ssh.Remote

Remote machine where executable will be executed.

required
executable str

Remote path of the executable which should be executed.

required
*arguments str

Positional arguments which are the command line parameter for the executable.

()

Exceptions:

Type Description
subprocess.CalledProcessError

If the remote process ends with an error. Its return code will be available through the exception.

Source code in bandsaw/advices/ssh.py
def execute_remote(self, remote, executable, *arguments):
    return self._run(
        [
            'ssh',
            '-p',
            str(remote.port),
        ]
        + self._key_file_option(remote)
        + [
            self.login(remote),
            str(executable),
        ]
        + list(arguments),
    )
get_remote_path(self, remote, path) 🔗

Returns the remote path in form of @:

Source code in bandsaw/advices/ssh.py
def get_remote_path(self, remote, path):
    """Returns the remote path in form of <user>@<host>:<path>"""
    return f"{self.login(remote)}:{path.absolute()}"
login(remote) staticmethod 🔗

Returns the destination of the remote in form of @

Source code in bandsaw/advices/ssh.py
@staticmethod
def login(remote):
    """Returns the destination of the remote in form of <user>@<host>"""
    return f"{remote.user}@{remote.host}"

subprocess 🔗

Contains Advice implementation that runs the execution in a subprocess

SubprocessAdvice (Advice) 🔗

Advice that runs in a subprocess

Source code in bandsaw/advices/subprocess.py
class SubprocessAdvice(Advice):
    """Advice that runs in a subprocess"""

    def __init__(self, directory=None, interpreter=None):
        """
        Create a new instance.

        Args:
            directory (str): The directory where temporary files are stored to
                exchange data between both processes. If `None` a temporary directory
                is used.
            interpreter (bandsaw.interpreter.Interpreter): The interpreter to use in
                the subprocess. If `None` the same interpreter will be used.
        """
        if directory is None:
            self.directory = None
            logger.info("Using temporary session directory")
        else:
            self.directory = pathlib.Path(directory)
            logger.info("Using directory %s", self.directory)
        self.interpreter = interpreter or Interpreter()
        super().__init__()

    def before(self, session):
        logger.info("before called in process %d", os.getpid())

        temp_dir = self.directory or session.temp_dir
        session_in_path = temp_dir / f'session-{session.session_id}-in.zip'
        session_out_path = temp_dir / f'session-{session.session_id}-out.zip'
        archive_path = session.distribution_archive.path

        logger.info("Writing session to %s", session_in_path)
        with io.FileIO(session_in_path, mode='w') as stream:
            session.save(stream)

        logger.info(
            "Continue session in subprocess using interpreter %s and "
            "distribution archive %s",
            self.interpreter.executable,
            archive_path,
        )
        environment = self.interpreter.environment
        environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
        subprocess.check_call(
            [
                self.interpreter.executable,
                archive_path,
                '--input',
                session_in_path,
                '--output',
                session_out_path,
                '--run-id',
                session.run_id,
            ],
            env=environment,
        )
        logger.info("Sub process exited")

        logger.info("Reading session from %s", session_out_path)
        with io.FileIO(session_out_path, mode='r') as stream:
            session.restore(stream)

        logger.info(
            "Cleaning up session files %s, %s",
            session_in_path,
            session_out_path,
        )
        session_in_path.unlink()
        session_out_path.unlink()

        logger.info("proceed() session in parent process")
        session.proceed()

    def after(self, session):
        logger.info("after called in process %d", os.getpid())
        logger.info("Sub process created result %s", session.result)
        logger.info("Returning to end session and continue in parent")
__init__(self, directory=None, interpreter=None) special 🔗

Create a new instance.

Parameters:

Name Type Description Default
directory str

The directory where temporary files are stored to exchange data between both processes. If None a temporary directory is used.

None
interpreter bandsaw.interpreter.Interpreter

The interpreter to use in the subprocess. If None the same interpreter will be used.

None
Source code in bandsaw/advices/subprocess.py
def __init__(self, directory=None, interpreter=None):
    """
    Create a new instance.

    Args:
        directory (str): The directory where temporary files are stored to
            exchange data between both processes. If `None` a temporary directory
            is used.
        interpreter (bandsaw.interpreter.Interpreter): The interpreter to use in
            the subprocess. If `None` the same interpreter will be used.
    """
    if directory is None:
        self.directory = None
        logger.info("Using temporary session directory")
    else:
        self.directory = pathlib.Path(directory)
        logger.info("Using directory %s", self.directory)
    self.interpreter = interpreter or Interpreter()
    super().__init__()
after(self, session) 🔗

Called after the task is actually executed.

This methods allows the individual advice, to make changes to the result of the task execution. The result can be retrieved from the session.

In order to continue, the advice MUST either call session.proceed(), which will continue the process with current result and the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will set a different result and continue with it.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/subprocess.py
def after(self, session):
    logger.info("after called in process %d", os.getpid())
    logger.info("Sub process created result %s", session.result)
    logger.info("Returning to end session and continue in parent")
before(self, session) 🔗

Called before the task is actually executed.

This methods allows the individual advice, to make changes to the way the task execution is later executed. In order to continue, the advice MUST either call session.proceed(), which will continue the process with the next advice in the advice chain, or call session.conclude(result) with a Result instance, which will skip the following advices and return without executing the task execution at all.

The default implementation will just call session.proceed().

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session of the execution.

required
Source code in bandsaw/advices/subprocess.py
def before(self, session):
    logger.info("before called in process %d", os.getpid())

    temp_dir = self.directory or session.temp_dir
    session_in_path = temp_dir / f'session-{session.session_id}-in.zip'
    session_out_path = temp_dir / f'session-{session.session_id}-out.zip'
    archive_path = session.distribution_archive.path

    logger.info("Writing session to %s", session_in_path)
    with io.FileIO(session_in_path, mode='w') as stream:
        session.save(stream)

    logger.info(
        "Continue session in subprocess using interpreter %s and "
        "distribution archive %s",
        self.interpreter.executable,
        archive_path,
    )
    environment = self.interpreter.environment
    environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
    subprocess.check_call(
        [
            self.interpreter.executable,
            archive_path,
            '--input',
            session_in_path,
            '--output',
            session_out_path,
            '--run-id',
            session.run_id,
        ],
        env=environment,
    )
    logger.info("Sub process exited")

    logger.info("Reading session from %s", session_out_path)
    with io.FileIO(session_out_path, mode='r') as stream:
        session.restore(stream)

    logger.info(
        "Cleaning up session files %s, %s",
        session_in_path,
        session_out_path,
    )
    session_in_path.unlink()
    session_out_path.unlink()

    logger.info("proceed() session in parent process")
    session.proceed()

config 🔗

Contains the class and functions to configure bandsaw.

Configuration 🔗

Class that represents a configuration for bandsaw.

Attributes:

Name Type Description
temporary_directory pathlib.Path

The path to a directory where temporary files are stored.

Source code in bandsaw/config.py
class Configuration:
    """
    Class that represents a configuration for bandsaw.

    Attributes:
        temporary_directory (pathlib.Path): The path to a directory where temporary
            files are stored.
    """

    def __init__(self):
        self._advice_chains = {}
        self.extensions = []
        self.serializer = PickleSerializer()
        self.add_advice_chain()
        stack = traceback.extract_stack(limit=2)
        config_module_file_path = stack[0].filename
        self.module_name = get_loaded_module_name_by_path(config_module_file_path)
        logger.info("Config created in module: %s", self.module_name)
        self.distribution_modules = []
        self.temporary_directory = None
        self.set_temp_directory(tempfile.mkdtemp(prefix='bandsaw'))

    def add_advice_chain(self, *advices, name='default'):
        """
        Add a new advice chain to the configuration.

        Each advice chain has a unique `name`. If multiple chains with the same name
        are added to the configuration, the last chain overwrites all previous chains.

        Args:
            *advices (bandsaw.advice.Advice): A tuple of advices for this chain.
            name (str): The name of the advice chain, defaults to 'default' if not
                specified.

        Returns:
            bandsaw.config.Configuration: The configuration to which the chain was
                added.

        """
        self._advice_chains[name] = advices
        return self

    def get_advice_chain(self, name):
        """
        Returns the advice chain with the given name.

        Args:
            name (str): Name of the wanted advice chain.

        Returns:
            List[bandsaw.advice.Advice]: The advice chain with the given name.

        Raises:
            KeyError: If no chain with the specified name is configured.
        """
        return self._advice_chains.get(name)

    def add_extension(self, extension):
        """
        Add an `Extension` to the configuration.

        `Extensions` are objects that can implement callbacks to be informed by
        bandsaw about certain conditions, e.g. the creation of new tasks or the final
        result of an execution.

        Args:
            extension (bandsaw.extension.Extension): An object implementing the
                `Extension`.

        Returns:
            bandsaw.config.Configuration: The configuration to which the extension was
                added.
        """
        self.extensions.append(extension)
        return self

    def set_serializer(self, serializer):
        """
        Sets the serialize which defines how tasks and results will be serialized.

        Args:
            serializer (bandsaw.serialization.Serializer): The serializer to use for
                serializing objects.

        Returns:
            bandsaw.config.Configuration: The configuration to which the extension was
                added.
        """
        self.serializer = serializer
        return self

    def add_modules_for_distribution(self, *modules):
        """
        Add modules that should be included in the distribution archive.

        Args:
            *modules (List[str]): Positional arguments with strings, that contain the
                names of modules, which should be included in the distribution
                archive.

        Returns:
            bandsaw.config.Configuration: The configuration with the added modules.
        """
        self.distribution_modules.extend(modules)
        return self

    def set_temp_directory(self, directory):
        """
        Sets the temporary directory.

        Args:
            directory (Union[str, pathlib.Path]): Path to the directory, where
                temporary files will be stored.
        """
        self.temporary_directory = pathlib.Path(directory)
        atexit.register(
            lambda path: shutil.rmtree(path, ignore_errors=True),
            str(self.temporary_directory),
        )

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return False
        return self.module_name == other.module_name

    def __hash__(self):
        return hash(self.module_name)

add_advice_chain(self, *advices, *, name='default') 🔗

Add a new advice chain to the configuration.

Each advice chain has a unique name. If multiple chains with the same name are added to the configuration, the last chain overwrites all previous chains.

Parameters:

Name Type Description Default
*advices bandsaw.advice.Advice

A tuple of advices for this chain.

()
name str

The name of the advice chain, defaults to 'default' if not specified.

'default'

Returns:

Type Description
bandsaw.config.Configuration

The configuration to which the chain was added.

Source code in bandsaw/config.py
def add_advice_chain(self, *advices, name='default'):
    """
    Add a new advice chain to the configuration.

    Each advice chain has a unique `name`. If multiple chains with the same name
    are added to the configuration, the last chain overwrites all previous chains.

    Args:
        *advices (bandsaw.advice.Advice): A tuple of advices for this chain.
        name (str): The name of the advice chain, defaults to 'default' if not
            specified.

    Returns:
        bandsaw.config.Configuration: The configuration to which the chain was
            added.

    """
    self._advice_chains[name] = advices
    return self

add_extension(self, extension) 🔗

Add an Extension to the configuration.

Extensions are objects that can implement callbacks to be informed by bandsaw about certain conditions, e.g. the creation of new tasks or the final result of an execution.

Parameters:

Name Type Description Default
extension bandsaw.extension.Extension

An object implementing the Extension.

required

Returns:

Type Description
bandsaw.config.Configuration

The configuration to which the extension was added.

Source code in bandsaw/config.py
def add_extension(self, extension):
    """
    Add an `Extension` to the configuration.

    `Extensions` are objects that can implement callbacks to be informed by
    bandsaw about certain conditions, e.g. the creation of new tasks or the final
    result of an execution.

    Args:
        extension (bandsaw.extension.Extension): An object implementing the
            `Extension`.

    Returns:
        bandsaw.config.Configuration: The configuration to which the extension was
            added.
    """
    self.extensions.append(extension)
    return self

add_modules_for_distribution(self, *modules) 🔗

Add modules that should be included in the distribution archive.

Parameters:

Name Type Description Default
*modules List[str]

Positional arguments with strings, that contain the names of modules, which should be included in the distribution archive.

()

Returns:

Type Description
bandsaw.config.Configuration

The configuration with the added modules.

Source code in bandsaw/config.py
def add_modules_for_distribution(self, *modules):
    """
    Add modules that should be included in the distribution archive.

    Args:
        *modules (List[str]): Positional arguments with strings, that contain the
            names of modules, which should be included in the distribution
            archive.

    Returns:
        bandsaw.config.Configuration: The configuration with the added modules.
    """
    self.distribution_modules.extend(modules)
    return self

get_advice_chain(self, name) 🔗

Returns the advice chain with the given name.

Parameters:

Name Type Description Default
name str

Name of the wanted advice chain.

required

Returns:

Type Description
List[bandsaw.advice.Advice]

The advice chain with the given name.

Exceptions:

Type Description
KeyError

If no chain with the specified name is configured.

Source code in bandsaw/config.py
def get_advice_chain(self, name):
    """
    Returns the advice chain with the given name.

    Args:
        name (str): Name of the wanted advice chain.

    Returns:
        List[bandsaw.advice.Advice]: The advice chain with the given name.

    Raises:
        KeyError: If no chain with the specified name is configured.
    """
    return self._advice_chains.get(name)

set_serializer(self, serializer) 🔗

Sets the serialize which defines how tasks and results will be serialized.

Parameters:

Name Type Description Default
serializer bandsaw.serialization.Serializer

The serializer to use for serializing objects.

required

Returns:

Type Description
bandsaw.config.Configuration

The configuration to which the extension was added.

Source code in bandsaw/config.py
def set_serializer(self, serializer):
    """
    Sets the serialize which defines how tasks and results will be serialized.

    Args:
        serializer (bandsaw.serialization.Serializer): The serializer to use for
            serializing objects.

    Returns:
        bandsaw.config.Configuration: The configuration to which the extension was
            added.
    """
    self.serializer = serializer
    return self

set_temp_directory(self, directory) 🔗

Sets the temporary directory.

Parameters:

Name Type Description Default
directory Union[str, pathlib.Path]

Path to the directory, where temporary files will be stored.

required
Source code in bandsaw/config.py
def set_temp_directory(self, directory):
    """
    Sets the temporary directory.

    Args:
        directory (Union[str, pathlib.Path]): Path to the directory, where
            temporary files will be stored.
    """
    self.temporary_directory = pathlib.Path(directory)
    atexit.register(
        lambda path: shutil.rmtree(path, ignore_errors=True),
        str(self.temporary_directory),
    )

get_configuration(configuration_module=None) 🔗

Return a configuration.

Parameters:

Name Type Description Default
configuration_module str

The module name of a module, which contains the configuration. The module needs to define a member 'configuration', which contains an instance of Configuration. If no module name is given, a default configuration is returned based on the value of the BANDSAW_CONFIG_MODULE environment variable. If this variable is not set, we default to 'bandsaw_config'.

None

Returns:

Type Description
bandsaw.config.Configuration

The configuration.

Exceptions:

Type Description
ModuleNotFoundError

If no module exists with name configuration_module.

LookupError

If the module doesn't contain a variable 'configuration`.

TypeError

If the variable configuration is not of type Configuration.

Source code in bandsaw/config.py
def get_configuration(configuration_module=None):
    """
    Return a configuration.

    Args:
        configuration_module (str): The module name of a module, which contains the
            configuration. The module needs to define a member 'configuration', which
            contains an instance of `Configuration`. If no module name is given, a
            default configuration is returned based on the value of the
            `BANDSAW_CONFIG_MODULE` environment variable. If this variable is not set,
            we default to 'bandsaw_config'.

    Returns:
        bandsaw.config.Configuration: The configuration.

    Raises:
        ModuleNotFoundError: If no module exists with name `configuration_module`.
        LookupError: If the module doesn't contain a variable 'configuration`.
        TypeError: If the variable `configuration` is not of type `Configuration`.
    """
    if configuration_module is None:
        default_configuration_module_name = os.getenv(
            CONFIGURATION_MODULE_ENV_VARIABLE,
            CONFIGURATION_MODULE_DEFAULT,
        )
        configuration_module = default_configuration_module_name

    if configuration_module not in _configurations:
        try:
            _load_configuration_module(configuration_module)
        except ModuleNotFoundError:
            logger.warning(
                "No module found for config %s",
                configuration_module,
            )
            raise
    return _configurations[configuration_module]

context 🔗

Classes that represent the context used in advising tasks.

Context (SerializableValue) 🔗

Class for representing the context for advising tasks.

The context contains of a set of arbitrary key-value mappings that can be used by the Advice classes to store state or communicate with other advices.

Source code in bandsaw/context.py
class Context(SerializableValue):
    """
    Class for representing the context for advising tasks.

    The context contains of a set of arbitrary key-value mappings that can be used
    by the `Advice` classes to store state or communicate with other advices.


    """

    def __init__(self, attributes=None):
        self._attributes = attributes or {}

    def serialized(self):
        data = {
            'attributes': self._attributes,
        }
        return data

    @classmethod
    def deserialize(cls, values):
        return Context(values['attributes'])

    @property
    def attributes(self):
        """
        A set of arbitrary key-value mappings for the `Advice` classes.

        `Advice` can add to this mapping and use this as a way of keeping state.
        """
        return self._attributes

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return False
        return self._attributes == other._attributes

attributes property readonly 🔗

A set of arbitrary key-value mappings for the Advice classes.

Advice can add to this mapping and use this as a way of keeping state.

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/context.py
@classmethod
def deserialize(cls, values):
    return Context(values['attributes'])

serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/context.py
def serialized(self):
    data = {
        'attributes': self._attributes,
    }
    return data

decorator 🔗

Contains decorators that allow to define individual tasks

task(*task_args, *, config=None, chain=None, **task_kwargs) 🔗

Decorator that is used to define a function as as task.

The decorator can be used in two different ways, standalone:

Examples:

>>> @task
... def my_task_function():
...      pass

or with additional configuration.

Examples:

>>> @task(config='my.config')
... def my_task_function():
...      pass

Parameters:

Name Type Description Default
config str

The name of the configuration module to use for this task. If not given, the default configuration is used.

None
chain str

The name of the advice chain to use for advising this task. If not given, 'default' is used.

None
*task_args

Positional args given to the decorator OR the decorated function. If the decorator is used WITHOUT providing additional configuration, task_args contains a tuple with a single item that is the function to be used as a task. If there is additional configuration given, task_args contains the positional arguments of the call of the decorator.

()
**task_kwargs

Keyword args given to the decorator. If the decorator is used WITHOUT providing additional configuration, task_kwargs is an empty dictionary. If there is additional configuration given, task_kwargscontains the keyword arguments of the call of the decorator.

{}

Returns:

Type Description
Callable

Returns a callable that wraps the decorated function.

Exceptions:

Type Description
ModuleNotFoundError

If the configured configuration module does not exist.

ValueError

If the specified advice chain does not exist.

RuntimeError

If the task is configured with multiple positional arguments.

Source code in bandsaw/decorator.py
def task(*task_args, config=None, chain=None, **task_kwargs):
    """
    Decorator that is used to define a function as as task.

    The decorator can be used in two different ways, standalone:

    Example:
        >>> @task
        ... def my_task_function():
        ...      pass

    or with additional configuration.

    Example:
        >>> @task(config='my.config')
        ... def my_task_function():
        ...      pass


    Args:
        config (str): The name of the configuration module to use for this task.
            If not given, the default configuration is used.
        chain (str): The name of the advice chain to use for advising this task.
            If not given, 'default' is used.
        *task_args: Positional args given to the decorator OR the decorated function.
            If the decorator is used WITHOUT providing additional configuration,
            `task_args` contains a tuple with a single item that is the function to be
            used as a task. If there is additional configuration given, `task_args`
            contains the positional arguments of the call of the decorator.
        **task_kwargs: Keyword args given to the decorator.
            If the decorator is used WITHOUT providing additional configuration,
            `task_kwargs` is an empty dictionary. If there is additional configuration
            given, `task_kwargs`contains the keyword arguments of the call of the
            decorator.

    Returns:
        Callable: Returns a callable that wraps the decorated function.

    Raises:
        ModuleNotFoundError: If the configured configuration module does not exist.
        ValueError: If the specified advice chain does not exist.
        RuntimeError: If the task is configured with multiple positional arguments.
    """

    config_module = config
    configuration = get_configuration(config_module)

    chain_name = chain or 'default'
    advice_chain = configuration.get_advice_chain(chain_name)
    if advice_chain is None:
        raise ValueError(f"Unknown advice chain {chain_name}")

    def decorate_function(func):
        logger.info("Decorate function '%s'", func)

        logger.info("Creating task for function '%s'", func)
        the_task = Task.create_task(func, task_kwargs)

        def inner(*func_args, **func_kwargs):

            execution_id = _calculate_execution_id(
                func_args,
                func_kwargs,
                configuration.serializer,
            )
            execution = Execution(execution_id, func_args, func_kwargs)

            result = advise_task_with_chain(
                the_task,
                execution,
                configuration,
                chain_name,
            )
            if result.exception:
                raise result.exception
            return result.value

        inner.__wrapped__ = func
        inner.bandsaw_task = the_task
        inner.bandsaw_configuration = configuration
        return inner

    if len(task_args) == 1 and len(task_kwargs) == 0:
        return decorate_function(task_args[0])
    if len(task_args) == 0 and (
        len(task_kwargs) > 0 or chain is not None or config is not None
    ):
        return decorate_function
    # This shouldn't happen if the decorator is properly used.
    raise RuntimeError("Invalid 'task' decorator.")

distribution 🔗

Contains functions for creating distribution archives.

Distribution archives are the way how bandsaw transfers code between different machines. They are normal zip files, that contain bandsaw itself, a main module which allows to execute the archive and to continue sessions and possibly some additional dependencies.

DistributionArchive 🔗

Class that represents a distribution archive.

A distribution archive contains all the code necessary for running a task. It can be used for running a task on a different machine by copying over the archive.

Attributes:

Name Type Description
path pathlib.Path

The path to the file containing the code.

modules tuple[str]

The names of the modules that are included in this archive.

Source code in bandsaw/distribution.py
class DistributionArchive:
    """
    Class that represents a distribution archive.

    A distribution archive contains all the code necessary for running a task. It can
    be used for running a task on a different machine by copying over the archive.

    Attributes:
        path (pathlib.Path): The path to the file containing the code.
        modules (tuple[str]): The names of the modules that are included in this
            archive.
    """

    def __init__(self, path, *modules):
        self._path = path
        self.modules = modules

    @property
    def path(self):
        """
        Returns:
            pathlib.Path: The path to the archive file. The file itself is created
                lazily, when the path is accessed the first time. This makes sure,
                we only create the archive if necessary.
        """
        if not self._path.exists():
            _create_distribution_archive(self._path, self.modules)
        return self._path

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return False
        return self.modules == other.modules

    def __hash__(self):
        return hash(self.modules)

path property readonly 🔗

Returns:

Type Description
pathlib.Path

The path to the archive file. The file itself is created lazily, when the path is accessed the first time. This makes sure, we only create the archive if necessary.

get_distribution_archive(configuration) 🔗

Returns a distribution archive for a given configuration.

Parameters:

Name Type Description Default
configuration bandsaw.config.Configuration

The configuration for which the distribution package should be returned.

required

Returns:

Type Description
bandsaw.distribution.DistributionArchive

The archive for the configuration.

Source code in bandsaw/distribution.py
def get_distribution_archive(configuration):
    """
    Returns a distribution archive for a given configuration.

    Args:
        configuration (bandsaw.config.Configuration): The configuration for which the
            distribution package should be returned.

    Returns:
        bandsaw.distribution.DistributionArchive: The archive for the configuration.
    """
    archive = _CACHE.get_archive(configuration)
    if archive is None:
        archive_path = pathlib.Path(
            tempfile.mktemp(suffix='.pyz', prefix='distribution-')
        )
        modules = [
            '__main__',
            'bandsaw',
            configuration.module_name,
            *configuration.distribution_modules,
        ]
        archive = DistributionArchive(archive_path, *modules)
        _CACHE.put_archive(configuration, archive)
    return archive

execution 🔗

Contains classes and functions around an execution of a task

Execution (SerializableValue) 🔗

Class that defines an execution of a Task.

It contains the arguments that should be used for the task and an unique identifier derived from those arguments.

Attributes:

Name Type Description
execution_id str

A string identifying this execution.

args tuple[Any]

The positional arguments for the task to use in this execution.

kwargs Dict[Any,Any]

The keyword arguments for the task to use in this execution.

Source code in bandsaw/execution.py
class Execution(SerializableValue):
    """
    Class that defines an execution of a `Task`.

    It contains the arguments that should be used for the task and an unique
    identifier derived from those arguments.

    Attributes:
        execution_id (str): A string identifying this execution.
        args (tuple[Any]): The positional arguments for the task to use in this
            execution.
        kwargs (Dict[Any,Any]): The keyword arguments for the task to use in this
            execution.
    """

    def __init__(self, execution_id, args=None, kwargs=None):
        self.execution_id = execution_id
        self.args = args or ()
        self.kwargs = kwargs or {}

    def serialized(self):
        return {
            'execution_id': self.execution_id,
            'args': self.args,
            'kwargs': self.kwargs,
        }

    @classmethod
    def deserialize(cls, values):
        return Execution(
            values['execution_id'],
            values['args'],
            values['kwargs'],
        )

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/execution.py
@classmethod
def deserialize(cls, values):
    return Execution(
        values['execution_id'],
        values['args'],
        values['kwargs'],
    )

serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/execution.py
def serialized(self):
    return {
        'execution_id': self.execution_id,
        'args': self.args,
        'kwargs': self.kwargs,
    }

extensions 🔗

Contains an API for extensions that can be used in bandsaw

Extension 🔗

Class that defines the interface of extensions.

An extension can define different callbacks that are called by bandsaw and allows to extend some existing functionality (e.g. by setting additional values in a context before it is handled by all advices) or integrate other systems. Other than Advice, an Extension is globally defined in a config and therefore applies to all tasks.

Source code in bandsaw/extensions.py
class Extension:
    """
    Class that defines the interface of extensions.

    An extension can define different callbacks that are called by bandsaw and allows
    to extend some existing functionality (e.g. by setting additional values in a
    context before it is handled by all advices) or integrate other systems.
    Other than `Advice`, an `Extension` is globally defined in a config and therefore
    applies to all tasks.
    """

    def on_init(self, configuration):
        """
        Called when a bandsaw configuration has been initialized.

        Args:
            configuration (bandsaw.config.Configuration): The configuration object
                which contains the config that has been loaded.
        """

    def on_session_created(self, session):
        """
        Called before bandsaw advises a task.

        This is called before any advice is applied.

        Args:
            session (bandsaw.session.Session): The new session.
        """

    def on_session_finished(self, session):
        """
        Called after bandsaw advised a task.

        This is called after all advices have been applied and the final result is
        available.

        Args:
            session (bandsaw.session.Session): The session.
        """

on_init(self, configuration) 🔗

Called when a bandsaw configuration has been initialized.

Parameters:

Name Type Description Default
configuration bandsaw.config.Configuration

The configuration object which contains the config that has been loaded.

required
Source code in bandsaw/extensions.py
def on_init(self, configuration):
    """
    Called when a bandsaw configuration has been initialized.

    Args:
        configuration (bandsaw.config.Configuration): The configuration object
            which contains the config that has been loaded.
    """

on_session_created(self, session) 🔗

Called before bandsaw advises a task.

This is called before any advice is applied.

Parameters:

Name Type Description Default
session bandsaw.session.Session

The new session.

required
Source code in bandsaw/extensions.py
def on_session_created(self, session):
    """
    Called before bandsaw advises a task.

    This is called before any advice is applied.

    Args:
        session (bandsaw.session.Session): The new session.
    """

on_session_finished(self, session) 🔗

Called after bandsaw advised a task.

This is called after all advices have been applied and the final result is available.

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session.

required
Source code in bandsaw/extensions.py
def on_session_finished(self, session):
    """
    Called after bandsaw advised a task.

    This is called after all advices have been applied and the final result is
    available.

    Args:
        session (bandsaw.session.Session): The session.
    """

identifier 🔗

Functions for generating identifier for arbitrary python objects.

identifier_from_bytes(buffer) 🔗

Derive an identifier from a bytebuffer.

Parameters:

Name Type Description Default
buffer Union[bytes,bytearray]

The binary data from which to derive an identifier.

required

Returns:

Type Description
str

The identifier in form of a string of a hexadecimal number.

Source code in bandsaw/identifier.py
def identifier_from_bytes(buffer):
    """
    Derive an identifier from a bytebuffer.

    Args:
        buffer (Union[bytes,bytearray]): The binary data from which to derive an
            identifier.

    Returns:
        str: The identifier in form of a string of a hexadecimal number.
    """
    identifier = hashlib.sha256(buffer).hexdigest()[:_ID_LENGTH]
    return identifier

identifier_from_string(string) 🔗

Derive an identifier from a string.

Parameters:

Name Type Description Default
string str

The string from which to derive an identifier.

required

Returns:

Type Description
str

The identifier in form of a string of a hexadecimal number.

Source code in bandsaw/identifier.py
def identifier_from_string(string):
    """
    Derive an identifier from a string.

    Args:
        string (str): The string from which to derive an identifier.

    Returns:
        str: The identifier in form of a string of a hexadecimal number.
    """
    identifier = identifier_from_bytes(string.encode('utf-8'))
    return identifier

infos 🔗

Infos about values, arguments and tasks

value_info(value) 🔗

Information about a value.

The information contains a string representation and a type name, but additional infos can be included as well.

Parameters:

Name Type Description Default
value Any

The value for which the infos should be returned.

required

Returns:

Type Description
Dict[str,str]

A dictionary containing the infos about the value.

Source code in bandsaw/infos.py
def value_info(value):
    """
    Information about a value.

    The information contains a string representation and a type name, but additional
    infos can be included as well.

    Args:
        value (Any): The value for which the infos should be returned.

    Returns:
        Dict[str,str]: A dictionary containing the infos about the value.
    """
    value_type = type(value)
    if isinstance(value, set):
        value = sorted(value)
    string_value = str(value)

    if len(string_value) > 100:
        string_value = string_value[:85] + '...' + string_value[-12:]

    info = {
        'type': value_type.__qualname__,
        'value': string_value,
    }
    if hasattr(value, '__len__'):
        info['size'] = str(len(value))

    if hasattr(value, 'info'):
        info.update(value.info())

    return info

interpreter 🔗

Contains classes regarding python interpreters

Interpreter (SerializableValue) 🔗

Class for representing different python interpreters.

This class is used to contain the information about specific python interpreters that are used within the library. In order to support multiple different interpreters there will be the option to define the interpreter as part of config. Currently only a single interpreter is automatically defined.

Source code in bandsaw/interpreter.py
class Interpreter(SerializableValue):
    """
    Class for representing different python interpreters.

    This class is used to contain the information about specific python interpreters
    that are used within the library. In order to support multiple different
    interpreters there will be the option to define the interpreter as part of config.
    Currently only a single interpreter is automatically defined.
    """

    def __init__(
        self,
        path=None,
        executable=None,
    ):
        """
        Create a new interpreter instance.

        Args:
            path (List[str]): A list of directory paths, to be used as $PYTHONPATH.
                If `None` the current `sys.path` is used.
            executable (str): The path to the python executable for this interpreter.
                If `None` the current `sys.executable` is used.
        """
        if path is None:
            self._path = tuple(sys.path)
        else:
            self._path = tuple(path)
        self.executable = executable or sys.executable
        self._environment = {}

    def set_environment(self, **environment):
        """
        Set the environment variables to use for this interpreter.

        A call to this methods overwrites all variables that have been set previously.

        Args:
            **environment: Arbitrary keyword arguments where the name of the keyword
                corresponds to the name of the environment variable and the values
                will be the values set in the environment.
        """
        self._environment = environment
        return self

    @property
    def environment(self):
        """The environment variables to be set for the interpreter."""
        return dict(self._environment)

    @property
    def path(self):
        """The python path items that will be used."""
        return tuple(self._path)

    def serialized(self):
        return {
            'path': self._path,
            'executable': self.executable,
            'environment': self._environment,
        }

    @classmethod
    def deserialize(cls, values):
        return Interpreter(
            path=values['path'],
            executable=values['executable'],
        ).set_environment(**values['environment'])

environment property readonly 🔗

The environment variables to be set for the interpreter.

path property readonly 🔗

The python path items that will be used.

__init__(self, path=None, executable=None) special 🔗

Create a new interpreter instance.

Parameters:

Name Type Description Default
path List[str]

A list of directory paths, to be used as $PYTHONPATH. If None the current sys.path is used.

None
executable str

The path to the python executable for this interpreter. If None the current sys.executable is used.

None
Source code in bandsaw/interpreter.py
def __init__(
    self,
    path=None,
    executable=None,
):
    """
    Create a new interpreter instance.

    Args:
        path (List[str]): A list of directory paths, to be used as $PYTHONPATH.
            If `None` the current `sys.path` is used.
        executable (str): The path to the python executable for this interpreter.
            If `None` the current `sys.executable` is used.
    """
    if path is None:
        self._path = tuple(sys.path)
    else:
        self._path = tuple(path)
    self.executable = executable or sys.executable
    self._environment = {}

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/interpreter.py
@classmethod
def deserialize(cls, values):
    return Interpreter(
        path=values['path'],
        executable=values['executable'],
    ).set_environment(**values['environment'])

serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/interpreter.py
def serialized(self):
    return {
        'path': self._path,
        'executable': self.executable,
        'environment': self._environment,
    }

set_environment(self, **environment) 🔗

Set the environment variables to use for this interpreter.

A call to this methods overwrites all variables that have been set previously.

Parameters:

Name Type Description Default
**environment

Arbitrary keyword arguments where the name of the keyword corresponds to the name of the environment variable and the values will be the values set in the environment.

{}
Source code in bandsaw/interpreter.py
def set_environment(self, **environment):
    """
    Set the environment variables to use for this interpreter.

    A call to this methods overwrites all variables that have been set previously.

    Args:
        **environment: Arbitrary keyword arguments where the name of the keyword
            corresponds to the name of the environment variable and the values
            will be the values set in the environment.
    """
    self._environment = environment
    return self

io 🔗

Contains classes related to input/output of data.

BytearrayGeneratorToStream (RawIOBase) 🔗

Stream that reads from a generator that yields bytes/bytearrays.

Source code in bandsaw/io.py
class BytearrayGeneratorToStream(io.RawIOBase):
    """
    Stream that reads from a generator that yields bytes/bytearrays.
    """

    def __init__(self, generator):
        self._generator = generator
        self._source = None
        self._source_position = 0
        super().__init__()

    def readinto(self, buffer):
        buffer_size = len(buffer)
        position = 0
        generator_empty = False
        while True:
            if self._source is None:
                self._source = next(self._generator, None)
                self._source_position = 0
                if self._source is None:
                    logger.debug("no new bytearray from generator, ending stream")
                    generator_empty = True
                else:
                    logger.debug(
                        "got new bytearray from generator of size %d",
                        len(self._source),
                    )
            if self._source is not None:
                source_bytes_left = len(self._source) - self._source_position

                buffer_free = buffer_size - position
                bytes_to_copy = min(buffer_free, source_bytes_left)
                logger.debug(
                    "copy %d bytes from current value to stream via buffer of size %d",
                    bytes_to_copy,
                    buffer_size,
                )
                buffer_end = position + bytes_to_copy
                source_end = self._source_position + bytes_to_copy
                buffer[position:buffer_end] = self._source[
                    self._source_position : source_end
                ]
                position += bytes_to_copy
                if bytes_to_copy < source_bytes_left:
                    self._source_position += bytes_to_copy
                else:
                    self._source = None
            buffer_full = position == buffer_size
            if buffer_full or generator_empty:
                break
        return position

read_stream_to_generator(stream, buffer_size=8192) 🔗

Read from a stream into a generator yielding bytes.

Parameters:

Name Type Description Default
stream io.Stream

The stream to read bytes from.

required
buffer_size int

The buffer size to read. Each individual bytes buffer returned by the generator has a maximum size of buffer_size. Defaults to 8192 if not set.

8192

Yields:

Type Description
bytes

A byte buffer of maximum size of buffer_size.

Source code in bandsaw/io.py
def read_stream_to_generator(stream, buffer_size=8192):
    """
    Read from a stream into a generator yielding `bytes`.

    Args:
        stream (io.Stream): The stream to read bytes from.
        buffer_size (int): The buffer size to read. Each individual `bytes` buffer
            returned by the generator has a maximum size of `buffer_size`. Defaults
            to `8192` if not set.

    Yields:
        bytes: A byte buffer of maximum size of `buffer_size`.

    """
    buffer = bytearray(buffer_size)
    while True:
        read = stream.readinto(buffer)
        if read:
            yield buffer[:read]
        else:
            break

modules 🔗

Utility functions for handling python modules.

get_loaded_module_name_by_path(file_path) 🔗

Determine the name of an already loaded module by its file path.

Parameters:

Name Type Description Default
file_path str

File path of the python file containing the module.

required

Returns:

Type Description
str

The name of the module, or None if the file isn't loaded as a module.

Source code in bandsaw/modules.py
def get_loaded_module_name_by_path(file_path):
    """
    Determine the name of an already loaded module by its file path.

    Args:
        file_path (str): File path of the python file containing the module.

    Returns:
        str: The name of the module, or `None` if the file isn't loaded as a module.

    """
    real_path = os.path.realpath(file_path)
    for name, module in sys.modules.items():
        if hasattr(module, '__file__'):
            module_path = os.path.realpath(module.__file__)
            if module_path == real_path:
                return name
    return None

import_object(object_name, module_name) 🔗

Import a python object from a module.

Parameters:

Name Type Description Default
object_name str

The name under which the object is defined in the module.

required
module_name str

The name of the module from which the object should be imported.

required

Returns:

Type Description
object

The python object defined under the name.

Exceptions:

Type Description
AttributeError

If nothing is defined with name object_name in the referenced module.

ModuleNotFoundError

If no module exists with the name module_name.

Source code in bandsaw/modules.py
def import_object(object_name, module_name):
    """
    Import a python object from a module.

    Args:
        object_name (str): The name under which the object is defined in the module.
        module_name (str): The name of the module from which the object should be
            imported.

    Returns:
        object: The python object defined under the name.

    Raises:
        AttributeError: If nothing is defined with name `object_name` in the
            referenced module.
        ModuleNotFoundError: If no module exists with the name `module_name`.
    """
    module = importlib.import_module(module_name)
    return getattr(module, object_name)

object_as_import(obj) 🔗

Returns the name and module of an object, which can be used for importing it.

Parameters:

Name Type Description Default
obj object

An arbitrary Python object.

required

Returns:

Type Description
Tuple(str, str)

Returns a tuple of the object name and the module name in which the object is defined.

Exceptions:

Type Description
ValueError

If obj doesn't have a name that can be directly imported, e.g. because it is defined within a local class.

Note

If obj is defined within the __main__ script, the function tries to determine a name for the __main__ module, under which it could be imported from other scripts.

Source code in bandsaw/modules.py
def object_as_import(obj):
    """
    Returns the name and module of an object, which can be used for importing it.

    Args:
        obj (object): An arbitrary Python object.

    Returns:
        Tuple(str, str): Returns a tuple of the object name and the module name in
            which the object is defined.

    Raises:
        ValueError: If `obj` doesn't have a name that can be directly imported, e.g.
            because it is defined within a local class.

    Note:
        If `obj` is defined within the `__main__` script, the function tries to
        determine a name for the `__main__` module, under which it could be imported
        from other scripts.

    """
    object_name = obj.__name__
    module_name = obj.__module__

    if module_name == '__main__':
        module_file_path = sys.modules['__main__'].__file__
        module_name = _guess_module_name_by_path(module_file_path, sys.path)

    if '<locals>' in obj.__qualname__:
        raise ValueError("Can't import local functions.")
    return object_name, module_name

result 🔗

Contains code for representing the result of tasks.

Result (SerializableValue) 🔗

Class to encapsulate the result of a task execution.

Attributes:

Name Type Description
value Any

The value that is returned by the task. None is the task raised an exception during execution.

exception Exception

The exception that was raised during execution, None if no exception was raised.

Source code in bandsaw/result.py
class Result(SerializableValue):
    """
    Class to encapsulate the result of a task execution.

    Attributes:
        value (Any): The value that is returned by the task. `None` is the task raised
            an exception during execution.
        exception (Exception): The exception that was raised during execution, `None`
            if no exception was raised.
    """

    def __init__(self, value=None, exception=None):
        self.value = value
        self.exception = exception

    def serialized(self):
        values = {
            "value": self.value,
            "exception": self.exception,
        }
        return values

    @classmethod
    def deserialize(cls, values):
        value = values["value"]
        exception = values["exception"]
        return Result(value=value, exception=exception)

    def __eq__(self, other):
        value_equals = self.value == other.value
        exception_type_equals = isinstance(other.exception, type(self.exception))
        exception_args_equals = getattr(self.exception, "args", None) == getattr(
            other.exception, "args", None
        )
        return value_equals and exception_type_equals and exception_args_equals

    def __hash__(self):
        return hash((self.value, repr(self.exception)))

deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/result.py
@classmethod
def deserialize(cls, values):
    value = values["value"]
    exception = values["exception"]
    return Result(value=value, exception=exception)

serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/result.py
def serialized(self):
    values = {
        "value": self.value,
        "exception": self.exception,
    }
    return values

run 🔗

Functions for managing the run id.

get_run_id() 🔗

Returns the run id.

The run id is a unique identifier that is specific to an individual run of a workflow. It stays the same across all task executions and can be used for tracking metrics and differentiating between different runs of the same workflow where task_id and run_id stay the same.

Returns:

Type Description
str

The unique run id.

Source code in bandsaw/run.py
def get_run_id():
    """
    Returns the run id.

    The run id is a unique identifier that is specific to an individual run of a
    workflow. It stays the same across all task executions and can be used for
    tracking metrics and differentiating between different runs of the same workflow
    where task_id and run_id stay the same.

    Returns:
        str: The unique run id.
    """
    if _RUN_ID is None:
        set_run_id(str(uuid.uuid1()))
    return _RUN_ID

set_run_id(run_id) 🔗

Sets the run id.

Setting the run id explicitly is usually not necessary. The function is mainly used when task executions are run in a different process to make sure the run id is consistent with the spawning process, but it can be used e.g. if an external system provides a unique identifier for a specific workflow run.

When set_run_id(run_id) is being used, it must be run before the first tasks are actually defined.

Exceptions:

Type Description
RuntimeError

If the run id was already set before.

Source code in bandsaw/run.py
def set_run_id(run_id):
    """
    Sets the run id.

    Setting the run id explicitly is usually not necessary. The function is mainly
    used when task executions are run in a different process to make sure the run id
    is consistent with the spawning process, but it can be used e.g. if an external
    system provides a unique identifier for a specific workflow run.

    When `set_run_id(run_id)` is being used, it must be run before the first tasks
    are actually defined.

    Raises:
        RuntimeError: If the run id was already set before.
    """
    global _RUN_ID  # pylint: disable=global-statement
    if _RUN_ID is not None:
        logger.error("run_id already set to %s when trying to set again", _RUN_ID)
        raise RuntimeError("Run ID was already set")
    logger.info("Set run_id to %s", run_id)
    _RUN_ID = run_id

runner 🔗

Contains main() function to continue sessions from files

main(args) 🔗

Main function that can be used for proceeding a session.

This function allows to read a session from a file, proceed it until it returns and then save the state of the session to a new file. It is used for running tasks in a separate process or on different machines.

Parameters:

Name Type Description Default
args tuple[str]

The arguments taken from the command line.

required
Source code in bandsaw/runner.py
def main(args):
    """
    Main function that can be used for proceeding a session.

    This function allows to read a session from a file, proceed it until it returns
    and then save the state of the session to a new file. It is used for running
    tasks in a separate process or on different machines.

    Args:
        args (tuple[str]): The arguments taken from the command line.
    """
    hostname = os.uname()[1]
    log_format = (
        f"{{asctime}} {hostname} {{process: >5d}} {{thread: >5d}} "
        f"{{name}} {{levelname}}: {{message}}"
    )
    logging.basicConfig(level=logging.INFO, format=log_format, style='{')

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input_session',
        help="The session which should be continued",
        required=True,
    )
    parser.add_argument(
        '--output',
        dest='output_session',
        help="The session after continuation ended",
        required=True,
    )
    parser.add_argument(
        '--run-id',
        dest='run_id',
        help="The run id of the workflow",
        required=True,
    )
    args = parser.parse_args(args=args)

    set_run_id(args.run_id)

    logger.info("Creating new session")
    session = Session()

    logger.info("Reading session from %s", args.output_session)
    with io.FileIO(args.input_session, mode='r') as stream:
        session.restore(stream)

    logger.info("Proceeding session")
    session.proceed()

    logger.info("Writing session with result to %s", args.output_session)
    with io.FileIO(args.output_session, mode='w') as stream:
        session.save(stream)

serialization special 🔗

Module for the package bandsaw.serialization. Contains all public types.

json 🔗

Contains a Serializer that allows to serialize objects to JSON.

JsonSerializer (Serializer) 🔗

A Serializer which serializes objects to JSON.

Attributes:

Name Type Description
value_serializers List[ValueSerializer]

A list of serializers that are used for serialization of custom types.

Source code in bandsaw/serialization/json.py
class JsonSerializer(Serializer):
    """
    A `Serializer` which serializes objects to JSON.

    Attributes:
        value_serializers (List[ValueSerializer]): A list of serializers that are used
            for serialization of custom types.
    """

    def __init__(self):
        super().__init__()
        self.value_serializers = []
        self.value_serializers.append(ExceptionSerializer())
        self.value_serializers.append(SerializableValueSerializer())
        self.value_serializers.append(TupleSerializer())

    def serialize(self, value, stream):
        """
        Serialize a value/object into a binary stream.

        Args:
            value (Any): The object/value to be serialized.
            stream (io.Stream): The binary stream where the serialized value is
                written.
        """
        text_stream = io.TextIOWrapper(stream, encoding='utf-8')
        json.dump(
            value,
            text_stream,
            cls=_ExtensibleJSONEncoder,
            value_serializers=self.value_serializers,
            allow_nan=False,
            indent=None,
            separators=(',', ':'),
            sort_keys=True,
        )
        text_stream.detach()

    def deserialize(self, stream):
        """
        Deserialize a value/object from a binary stream.

        Args:
            stream (io.Stream): The binary stream from where the serialized value is
                read.

        Returns:
             Any: The object/value which was deserialized.
        """
        text_stream = io.TextIOWrapper(stream, encoding='utf-8')
        result = json.load(
            text_stream,
            cls=_ExtensibleJSONDecoder,
            value_serializers=self.value_serializers,
        )
        text_stream.detach()
        return result
deserialize(self, stream) 🔗

Deserialize a value/object from a binary stream.

Parameters:

Name Type Description Default
stream io.Stream

The binary stream from where the serialized value is read.

required

Returns:

Type Description
Any

The object/value which was deserialized.

Source code in bandsaw/serialization/json.py
def deserialize(self, stream):
    """
    Deserialize a value/object from a binary stream.

    Args:
        stream (io.Stream): The binary stream from where the serialized value is
            read.

    Returns:
         Any: The object/value which was deserialized.
    """
    text_stream = io.TextIOWrapper(stream, encoding='utf-8')
    result = json.load(
        text_stream,
        cls=_ExtensibleJSONDecoder,
        value_serializers=self.value_serializers,
    )
    text_stream.detach()
    return result
serialize(self, value, stream) 🔗

Serialize a value/object into a binary stream.

Parameters:

Name Type Description Default
value Any

The object/value to be serialized.

required
stream io.Stream

The binary stream where the serialized value is written.

required
Source code in bandsaw/serialization/json.py
def serialize(self, value, stream):
    """
    Serialize a value/object into a binary stream.

    Args:
        value (Any): The object/value to be serialized.
        stream (io.Stream): The binary stream where the serialized value is
            written.
    """
    text_stream = io.TextIOWrapper(stream, encoding='utf-8')
    json.dump(
        value,
        text_stream,
        cls=_ExtensibleJSONEncoder,
        value_serializers=self.value_serializers,
        allow_nan=False,
        indent=None,
        separators=(',', ':'),
        sort_keys=True,
    )
    text_stream.detach()

pickle 🔗

Contains a Serializer which uses pickle for serializing values.

PickleSerializer (Serializer) 🔗

A Serializer which serializes objects using pickle.

Source code in bandsaw/serialization/pickle.py
class PickleSerializer(Serializer):
    """A `Serializer` which serializes objects using pickle."""

    def serialize(self, value, stream):
        """
        Serialize a value/object into a binary stream.

        Args:
            value (Any): The object/value to be serialized.
            stream (io.Stream): The binary stream where the serialized value is
                written.
        """
        pickle.dump(value, stream)

    def deserialize(self, stream):
        """
        Deserialize a value/object from a binary stream.

        Args:
            stream (io.Stream): The binary stream from where the serialized value is
                read.

        Returns:
             Any: The object/value which was deserialized.
        """
        return pickle.load(stream)
deserialize(self, stream) 🔗

Deserialize a value/object from a binary stream.

Parameters:

Name Type Description Default
stream io.Stream

The binary stream from where the serialized value is read.

required

Returns:

Type Description
Any

The object/value which was deserialized.

Source code in bandsaw/serialization/pickle.py
def deserialize(self, stream):
    """
    Deserialize a value/object from a binary stream.

    Args:
        stream (io.Stream): The binary stream from where the serialized value is
            read.

    Returns:
         Any: The object/value which was deserialized.
    """
    return pickle.load(stream)
serialize(self, value, stream) 🔗

Serialize a value/object into a binary stream.

Parameters:

Name Type Description Default
value Any

The object/value to be serialized.

required
stream io.Stream

The binary stream where the serialized value is written.

required
Source code in bandsaw/serialization/pickle.py
def serialize(self, value, stream):
    """
    Serialize a value/object into a binary stream.

    Args:
        value (Any): The object/value to be serialized.
        stream (io.Stream): The binary stream where the serialized value is
            written.
    """
    pickle.dump(value, stream)

serializer 🔗

Base classes for serializers which allow to serialize python values.

Serializer (ABC) 🔗

Interface for Serializer which serialize objects

Source code in bandsaw/serialization/serializer.py
class Serializer(abc.ABC):
    """Interface for `Serializer` which serialize objects"""

    @abc.abstractmethod
    def serialize(self, value, stream):
        """
        Serialize a value/object into a binary stream.

        Args:
            value (Any): The object/value to be serialized.
            stream (io.Stream): The binary stream where the serialized value is
                written.
        """

    @abc.abstractmethod
    def deserialize(self, stream):
        """
        Deserialize a value/object from a binary stream.

        Args:
            stream (io.Stream): The binary stream from where the serialized value is
                read.

        Returns:
             Any: The object/value which was deserialized.
        """
deserialize(self, stream) 🔗

Deserialize a value/object from a binary stream.

Parameters:

Name Type Description Default
stream io.Stream

The binary stream from where the serialized value is read.

required

Returns:

Type Description
Any

The object/value which was deserialized.

Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def deserialize(self, stream):
    """
    Deserialize a value/object from a binary stream.

    Args:
        stream (io.Stream): The binary stream from where the serialized value is
            read.

    Returns:
         Any: The object/value which was deserialized.
    """
serialize(self, value, stream) 🔗

Serialize a value/object into a binary stream.

Parameters:

Name Type Description Default
value Any

The object/value to be serialized.

required
stream io.Stream

The binary stream where the serialized value is written.

required
Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def serialize(self, value, stream):
    """
    Serialize a value/object into a binary stream.

    Args:
        value (Any): The object/value to be serialized.
        stream (io.Stream): The binary stream where the serialized value is
            written.
    """

values 🔗

A collection of classes for serializing custom objects.

ExceptionSerializer (ValueSerializer) 🔗

A ValueSerializer for serializing exceptions.

The serializer saves only the type and the args attribute of the exception, therefore it won't work for all exception types, but it should cover the most. Other attributes of the exception, e.g. stacktrace etc. are discarded.

Source code in bandsaw/serialization/values.py
class ExceptionSerializer(ValueSerializer):
    """
    A ValueSerializer for serializing exceptions.

    The serializer saves only the type and the `args` attribute of the exception,
    therefore it won't work for all exception types, but it should cover the most.
    Other attributes of the exception, e.g. stacktrace etc. are discarded.
    """

    def can_serialize_value(self, value):
        return isinstance(value, Exception)

    def serialize_value(self, value):
        state = {
            'type': type(value).__name__,
            'module': type(value).__module__,
            'args': value.args,
        }
        return state

    def deserialize_value(self, representation):
        module_name = representation['module']
        type_name = representation['type']
        module = importlib.import_module(module_name)
        value_type = getattr(module, type_name)
        return value_type(*representation['args'])
can_serialize_value(self, value) 🔗

Returns if a serializer can serialize a specific value.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
boolean

True if this serializer can serialize the given value, otherwise False.

Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
    return isinstance(value, Exception)
deserialize_value(self, representation) 🔗

Returns a deserialized value from its serialized representation.

Parameters:

Name Type Description Default
representation Any

The serialized representation of the value.

required

Returns:

Type Description
Any

The deserialized value.

Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
    module_name = representation['module']
    type_name = representation['type']
    module = importlib.import_module(module_name)
    value_type = getattr(module, type_name)
    return value_type(*representation['args'])
serialize_value(self, value) 🔗

Returns a serialized representation of the given value.

The returned representation can use standard python types like primitive values, lists or dicts.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
Any

The serialized representation of the value.

Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
    state = {
        'type': type(value).__name__,
        'module': type(value).__module__,
        'args': value.args,
    }
    return state

SerializableValue (ABC) 🔗

Interface for types that can serialize themselves.

Source code in bandsaw/serialization/values.py
class SerializableValue(abc.ABC):
    """Interface for types that can serialize themselves."""

    @abc.abstractmethod
    def serialized(self):
        """Returns a serializable representation of the value."""

    @classmethod
    @abc.abstractmethod
    def deserialize(cls, values):
        """Returns a new instance of a value from its serialized representation."""
deserialize(values) classmethod 🔗

Returns a new instance of a value from its serialized representation.

Source code in bandsaw/serialization/values.py
@classmethod
@abc.abstractmethod
def deserialize(cls, values):
    """Returns a new instance of a value from its serialized representation."""
serialized(self) 🔗

Returns a serializable representation of the value.

Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialized(self):
    """Returns a serializable representation of the value."""

SerializableValueSerializer (ValueSerializer) 🔗

A ValueSerializer for serializing subclasses of SerializableValue.

The serializer uses the methods defined in SerializableValue and implemented by the individual classes to serialize values. It stores the type of the value and its serialized representation and allows to recreate the value from this information.

Source code in bandsaw/serialization/values.py
class SerializableValueSerializer(ValueSerializer):
    """
    A ValueSerializer for serializing subclasses of `SerializableValue`.

    The serializer uses the methods defined in `SerializableValue` and implemented
    by the individual classes to serialize values. It stores the type of the value
    and its serialized representation and allows to recreate the value from this
    information.
    """

    def can_serialize_value(self, value):
        return isinstance(value, SerializableValue)

    def serialize_value(self, value):
        state = {
            'type': type(value).__name__,
            'module': type(value).__module__,
            'serialized': value.serialized(),
        }
        return state

    def deserialize_value(self, representation):
        module_name = representation['module']
        type_name = representation['type']
        module = importlib.import_module(module_name)
        value_type = getattr(module, type_name)
        return value_type.deserialize(representation['serialized'])
can_serialize_value(self, value) 🔗

Returns if a serializer can serialize a specific value.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
boolean

True if this serializer can serialize the given value, otherwise False.

Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
    return isinstance(value, SerializableValue)
deserialize_value(self, representation) 🔗

Returns a deserialized value from its serialized representation.

Parameters:

Name Type Description Default
representation Any

The serialized representation of the value.

required

Returns:

Type Description
Any

The deserialized value.

Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
    module_name = representation['module']
    type_name = representation['type']
    module = importlib.import_module(module_name)
    value_type = getattr(module, type_name)
    return value_type.deserialize(representation['serialized'])
serialize_value(self, value) 🔗

Returns a serialized representation of the given value.

The returned representation can use standard python types like primitive values, lists or dicts.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
Any

The serialized representation of the value.

Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
    state = {
        'type': type(value).__name__,
        'module': type(value).__module__,
        'serialized': value.serialized(),
    }
    return state

TupleSerializer (ValueSerializer) 🔗

A ValueSerializer for serializing tuples.

The serializer supports normal tuples as well as named tuples. When namedtuples are deserialized it first tries to reuse an existing namedtople type. If the type can't be imported or reused, a new namedtuple type with the same name and fields is created on the fly.

Source code in bandsaw/serialization/values.py
class TupleSerializer(ValueSerializer):
    """
    A ValueSerializer for serializing tuples.

    The serializer supports normal tuples as well as named tuples. When namedtuples
    are deserialized it first tries to reuse an existing namedtople type. If the type
    can't be imported or reused, a new namedtuple type with the same name and fields
    is created on the fly.
    """

    def can_serialize_value(self, value):
        return isinstance(value, tuple)

    def serialize_value(self, value):
        if hasattr(value, '_fields'):
            state = {
                'type': 'namedtuple',
                'fields': list(value._fields),
                'name': type(value).__name__,
                'module': type(value).__module__,
                'items': list(value),
            }
        else:
            state = {
                'type': 'tuple',
                'items': list(value),
            }

        return state

    def deserialize_value(self, representation):
        if representation['type'] == 'namedtuple':
            # try to import the namedtuple type
            module_name = representation['module']
            type_name = representation['name']
            try:
                module = importlib.import_module(module_name)
                tuple_type = getattr(module, type_name)
            except (ImportError, AttributeError) as error:
                logger.warning(
                    "Error importing namedtuple, trying to recreate it: %s", error
                )
                # Recreate a new type
                field_names = ' '.join(representation['fields'])
                tuple_type = collections.namedtuple(
                    type_name, field_names, module=module_name
                )
            return tuple_type(*representation['items'])

        return tuple(representation['items'])
can_serialize_value(self, value) 🔗

Returns if a serializer can serialize a specific value.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
boolean

True if this serializer can serialize the given value, otherwise False.

Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
    return isinstance(value, tuple)
deserialize_value(self, representation) 🔗

Returns a deserialized value from its serialized representation.

Parameters:

Name Type Description Default
representation Any

The serialized representation of the value.

required

Returns:

Type Description
Any

The deserialized value.

Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
    if representation['type'] == 'namedtuple':
        # try to import the namedtuple type
        module_name = representation['module']
        type_name = representation['name']
        try:
            module = importlib.import_module(module_name)
            tuple_type = getattr(module, type_name)
        except (ImportError, AttributeError) as error:
            logger.warning(
                "Error importing namedtuple, trying to recreate it: %s", error
            )
            # Recreate a new type
            field_names = ' '.join(representation['fields'])
            tuple_type = collections.namedtuple(
                type_name, field_names, module=module_name
            )
        return tuple_type(*representation['items'])

    return tuple(representation['items'])
serialize_value(self, value) 🔗

Returns a serialized representation of the given value.

The returned representation can use standard python types like primitive values, lists or dicts.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
Any

The serialized representation of the value.

Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
    if hasattr(value, '_fields'):
        state = {
            'type': 'namedtuple',
            'fields': list(value._fields),
            'name': type(value).__name__,
            'module': type(value).__module__,
            'items': list(value),
        }
    else:
        state = {
            'type': 'tuple',
            'items': list(value),
        }

    return state

ValueSerializer (ABC) 🔗

Interface for serializers that can serialize custom values.

Source code in bandsaw/serialization/values.py
class ValueSerializer(abc.ABC):
    """
    Interface for serializers that can serialize custom values.
    """

    @abc.abstractmethod
    def can_serialize_value(self, value):
        """
        Returns if a serializer can serialize a specific value.

        Args:
            value (Any): The value that should be serialized.

        Returns:
            boolean: `True` if this serializer can serialize the given value,
                otherwise `False`.
        """

    @abc.abstractmethod
    def serialize_value(self, value):
        """
        Returns a serialized representation of the given value.

        The returned representation can use standard python types like primitive
        values, lists or dicts.

        Args:
            value (Any): The value that should be serialized.

        Returns:
            Any: The serialized representation of the value.
        """

    @abc.abstractmethod
    def deserialize_value(self, representation):
        """
        Returns a deserialized value from its serialized representation.

        Args:
            representation (Any): The serialized representation of the value.

        Returns:
            Any: The deserialized value.
        """
can_serialize_value(self, value) 🔗

Returns if a serializer can serialize a specific value.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
boolean

True if this serializer can serialize the given value, otherwise False.

Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def can_serialize_value(self, value):
    """
    Returns if a serializer can serialize a specific value.

    Args:
        value (Any): The value that should be serialized.

    Returns:
        boolean: `True` if this serializer can serialize the given value,
            otherwise `False`.
    """
deserialize_value(self, representation) 🔗

Returns a deserialized value from its serialized representation.

Parameters:

Name Type Description Default
representation Any

The serialized representation of the value.

required

Returns:

Type Description
Any

The deserialized value.

Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def deserialize_value(self, representation):
    """
    Returns a deserialized value from its serialized representation.

    Args:
        representation (Any): The serialized representation of the value.

    Returns:
        Any: The deserialized value.
    """
serialize_value(self, value) 🔗

Returns a serialized representation of the given value.

The returned representation can use standard python types like primitive values, lists or dicts.

Parameters:

Name Type Description Default
value Any

The value that should be serialized.

required

Returns:

Type Description
Any

The serialized representation of the value.

Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialize_value(self, value):
    """
    Returns a serialized representation of the given value.

    The returned representation can use standard python types like primitive
    values, lists or dicts.

    Args:
        value (Any): The value that should be serialized.

    Returns:
        Any: The serialized representation of the value.
    """

session 🔗

Contains classes for representing an advising session

Attachment (ABC) 🔗

Class that represents a single file that as been attached to a session.

Source code in bandsaw/session.py
class Attachment(abc.ABC):
    """
    Class that represents a single file that as been attached to a session.
    """

    @abc.abstractmethod
    def open(self):
        """
        Opens the attachment for reading.

        Returns:
            io.RawIOBase: binary stream for reading.
        """

    @property
    @abc.abstractmethod
    def size(self):
        """Return the size of the attachment in bytes"""

size property readonly 🔗

Return the size of the attachment in bytes

open(self) 🔗

Opens the attachment for reading.

Returns:

Type Description
io.RawIOBase

binary stream for reading.

Source code in bandsaw/session.py
@abc.abstractmethod
def open(self):
    """
    Opens the attachment for reading.

    Returns:
        io.RawIOBase: binary stream for reading.
    """

Attachments (Mapping) 🔗

A mapping that contains attachments.

Attachments can only be added, but neither deleted nor overwritten. Their names must be valid file names without directories.

Attachments itself is a mapping class and can be used similar to a dictionary. When a new attachments is added, it must be path to an existing file, either as str or pathlib.Path. When an attachment is accessed, an object of type Attachment is returned, that gives access to the size of the attachment and allows to read its content.

Examples:

>>> attachments = Attachments()
>>> attachments['my.attachment'] = '/path/to/file'
>>> attachment = attachments['my.attachment']
>>> attachment.size
1234
>>> attachment.open().readall()
b'My binary file content.'
Source code in bandsaw/session.py
class Attachments(collections.abc.Mapping):
    """
    A mapping that contains attachments.

    Attachments can only be added, but neither deleted nor overwritten. Their names
    must be valid file names without directories.

    Attachments itself is a mapping class and can be used similar to a dictionary.
    When a new attachments is added, it must be path to an existing file, either as
    `str` or `pathlib.Path`. When an attachment is accessed, an object of type
    `Attachment` is returned, that gives access to the size of the attachment and
    allows to read its content.

    Examples:
        >>> attachments = Attachments()
        >>> attachments['my.attachment'] = '/path/to/file'
        >>> attachment = attachments['my.attachment']
        >>> attachment.size
        1234
        >>> attachment.open().readall()
        b'My binary file content.'

    """

    def __init__(self, zip_file=None):
        """
        Creates a new container for attachments.

        Args:
            zip_file (zipfile.ZipFile): An already existing zip file, which can be
                used for initializing with pre-existing attachments.
        """
        self._items = {}
        if zip_file is not None:
            self._add_attachments_from_zip(zip_file)

    def _add_attachments_from_zip(self, zip_file):
        for file_path in zip_file.namelist():
            if file_path[:12] == 'attachments/':
                attachment_name = file_path.split('/', 1)[1]
                self._items[attachment_name] = _ZipAttachment(zip_file, file_path)

    def store(self, zip_file):
        """
        Stores all attachments in a zip file.

        Args:
            zip_file (zipfile.ZipFile): The zip file where the attachments should be
                stored in.
        """
        for name, attachment in self._items.items():
            with attachment.open() as stream:
                zip_file.writestr('attachments/' + name, stream.read())

    def __setitem__(self, key, path):
        if key in self._items:
            raise KeyError(f"Attachment '{key}' does already exist")
        if isinstance(path, str):
            path = pathlib.Path(path)
        if not isinstance(path, pathlib.Path):
            raise TypeError("Invalid type for value, must be str or Path")
        if not path.exists():
            raise ValueError("File does not exist")
        if not path.is_file():
            raise ValueError("Path is not a file")
        self._items[key] = _FileAttachment(path)

    def __getitem__(self, key):
        return self._items[key]

    def __iter__(self):
        return iter(self._items)

    def __len__(self):
        return len(self._items)

__init__(self, zip_file=None) special 🔗

Creates a new container for attachments.

Parameters:

Name Type Description Default
zip_file zipfile.ZipFile

An already existing zip file, which can be used for initializing with pre-existing attachments.

None
Source code in bandsaw/session.py
def __init__(self, zip_file=None):
    """
    Creates a new container for attachments.

    Args:
        zip_file (zipfile.ZipFile): An already existing zip file, which can be
            used for initializing with pre-existing attachments.
    """
    self._items = {}
    if zip_file is not None:
        self._add_attachments_from_zip(zip_file)

store(self, zip_file) 🔗

Stores all attachments in a zip file.

Parameters:

Name Type Description Default
zip_file zipfile.ZipFile

The zip file where the attachments should be stored in.

required
Source code in bandsaw/session.py
def store(self, zip_file):
    """
    Stores all attachments in a zip file.

    Args:
        zip_file (zipfile.ZipFile): The zip file where the attachments should be
            stored in.
    """
    for name, attachment in self._items.items():
        with attachment.open() as stream:
            zip_file.writestr('attachments/' + name, stream.read())

Ids 🔗

Class that encapsulates the ids of a session.

Attributes:

Name Type Description
task_id str

The id of the task in this session.

execution_id str

The id of the execution of the task in this session.

run_id str

The id of the current run.

session_id str

The id of the session. The id is a combination of the three other ids.

Source code in bandsaw/session.py
class Ids:
    """
    Class that encapsulates the ids of a session.

    Attributes:
        task_id (str): The id of the task in this session.
        execution_id (str): The id of the execution of the task in this session.
        run_id (str): The id of the current run.
        session_id (str): The id of the session. The id is a combination of the three
            other ids.
    """

    slots = ('task_id', 'execution_id', 'run_id', '_session_id')

    def __init__(self, task_id, execution_id, run_id):
        self.task_id = task_id
        self.execution_id = execution_id
        self.run_id = run_id
        self.session_id = "_".join(
            [self.task_id, self.execution_id, self.run_id],
        )

    def __str__(self):
        return self.session_id

    def as_path(self):
        """
        Returns a relative path derived from the ids.

        Returns:
            pathlib.Path: relative path that uses the individual ids as components.
        """
        return pathlib.Path(self.task_id) / self.execution_id / self.run_id

    @classmethod
    def from_string(cls, id_as_string):
        """Create new Ids object from its string representation."""
        return Ids(*(id_as_string.split('_')))

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return False
        return self.session_id == other.session_id

    def __hash__(self):
        return hash(self.session_id)

as_path(self) 🔗

Returns a relative path derived from the ids.

Returns:

Type Description
pathlib.Path

relative path that uses the individual ids as components.

Source code in bandsaw/session.py
def as_path(self):
    """
    Returns a relative path derived from the ids.

    Returns:
        pathlib.Path: relative path that uses the individual ids as components.
    """
    return pathlib.Path(self.task_id) / self.execution_id / self.run_id

from_string(id_as_string) classmethod 🔗

Create new Ids object from its string representation.

Source code in bandsaw/session.py
@classmethod
def from_string(cls, id_as_string):
    """Create new Ids object from its string representation."""
    return Ids(*(id_as_string.split('_')))

Session 🔗

Class that handles the advising of an execution.

A Session object is given to the individual advices that are called to advise the execution. By calling the appropriate methods like def proceed(self) to continue or conclude() to end with a result, the advices can influence the final result. Additionally, the session provides access to the context, which allows advices to keep state, the execution that is advised, the configuration that is used for advising and the result of the execution.

Attributes:

Name Type Description
task bandsaw.tasks.Task

The task that is executed.

execution bandsaw.execution.Execution

The execution arguments for the task.

context bandsaw.context.Context

The context that can be used for advices to store state.

result bandsaw.result.Result

Result of the task if already computed. Otherwise, None.

attachments bandsaw.session.Attachments

A mapping of files that have been attached to the session.

configuration bandsaw.config.Configuration

The configuration that is being used for advising this task.

Source code in bandsaw/session.py
class Session:
    """
    Class that handles the advising of an execution.

    A `Session` object is given to the individual advices that are called to advise
    the execution. By calling the appropriate methods like `def proceed(self)` to
    continue or `conclude()` to end with a result, the advices can influence the final
    result.
    Additionally, the session provides access to the `context`, which allows advices
    to keep state, the `execution` that is advised, the `configuration` that is used
    for advising and the `result` of the execution.

    Attributes:
        task (bandsaw.tasks.Task): The task that is executed.
        execution (bandsaw.execution.Execution): The execution arguments for the task.
        context (bandsaw.context.Context): The context that can be used for advices
            to store state.
        result (bandsaw.result.Result): Result of the task if already computed.
            Otherwise, `None`.
        attachments (bandsaw.session.Attachments): A mapping of files that have been
            attached to the session.
        configuration (bandsaw.config.Configuration): The configuration that is being
            used for advising this task.
    """

    # pylint: disable=too-many-instance-attributes
    # is reasonable in this case.

    def __init__(
        self,
        task=None,
        execution=None,
        configuration=None,
        advice_chain='default',
    ):
        """
        Create a new session.

        """
        self.task = task
        self.execution = execution
        self.context = {}
        self.result = None
        self.attachments = Attachments()
        self.configuration = configuration
        self._advice_chain = advice_chain
        self._moderator = None
        self._ids = None
        self._temp_dir = None

    def initiate(self):
        """
        Start the process of advising an execution.

        Returns:
            bandsaw.result.Result: The final result of the execution after all
                advices.
        """

        self._moderator = _Moderator(
            self.configuration.get_advice_chain(self._advice_chain)
        )

        logger.debug("running extensions before advice")
        for extension in self.configuration.extensions:
            extension.on_session_created(self)

        self.proceed()

        if not self._moderator.is_finished:
            raise RuntimeError(
                f"Not all advice has been applied. "
                f"Misbehaving advice {self._moderator.current_advice}"
            )

        logger.debug("running extensions after advice")
        for extension in self.configuration.extensions:
            extension.on_session_finished(self)

        return self.result

    @property
    def ids(self):
        """The ids of this session."""
        if self._ids is None:
            if self.task is None or self.execution is None:
                raise ValueError("Incomplete session, missing task or execution.")
            self._ids = Ids(
                self.task.task_id,
                self.execution.execution_id,
                self.run_id,
            )
        return self._ids

    @property
    def session_id(self):
        """The id of this session as string."""
        return str(self.ids)

    @property
    def serializer(self):
        """The serializer that can be used for serializing values."""
        return self.configuration.serializer

    @property
    def distribution_archive(self):
        """The DistributionArchive which can be used when transferring the session."""
        return get_distribution_archive(self.configuration)

    @property
    def run_id(self):
        """The run id of the workflow."""
        return get_run_id()

    @property
    def temp_dir(self):
        """
        Temporary directory where session specific files can be written to.

        This directory is meant for storing temporary files, that are used by the
        individual `Advice` instances. The directory is already created and will be
        automatically deleted with the end of the python interpreter, nonetheless,
        the advices writing files to the directory should if possible take care of
        removing them if no longer needed.

        Returns:
            pathlib.Path: Path to the temporary directory.
        """
        if self._temp_dir is None:
            self._temp_dir = self.configuration.temporary_directory / self.ids.as_path()
            self._temp_dir.mkdir(parents=True, exist_ok=True)
        return self._temp_dir

    def proceed(self):
        """
        Continue the process of advising with the next advice.
        """
        self._moderator.next(self)

    def conclude(self, result):
        """
        Conclude the process of advising with a `Result`.

        This can be used in two cases:

        1. Concluding BEFORE the task was actually executed. This will skip all
           subsequent advices defined later in the advice chain and will skip the
           task execution. The given `result` will then be used as preliminary result.
           All advices that are defined before the calling advice in the advice chain
           will still be called with there `after(session)` method.

        2. Concluding AFTER the task was actually executed. This will just change the
           `result` of the session and continue will all following advices.

        Args:
            result (bandsaw.result.Result): The result to conclude with.
        """
        self.result = result
        self._moderator.skip(self)

    def save(self, stream):
        """
        Suspend the session to be resumed later or elsewhere.
        """
        self._store_as_zip(stream)

    def restore(self, stream):
        """
        Resume a prior suspended session.
        """
        self._load_from_zip(stream)
        return self

    def _load_from_zip(self, stream):

        # We don't use with here, because we don't want to close the zip file
        # This allows the attachment's container, to access the attachments from the
        # archive
        archive = zipfile.ZipFile(stream, 'r')  # pylint: disable=consider-using-with

        session_json = json.loads(archive.read('session.json'))
        self.configuration = get_configuration(session_json['configuration'])
        self._advice_chain = session_json['advice_chain']
        self._ids = Ids.from_string(session_json['ids'])

        serializer = self.configuration.serializer

        stream = io.BytesIO(archive.read('task.dat'))
        self.task = serializer.deserialize(stream)

        stream = io.BytesIO(archive.read('execution.dat'))
        self.execution = serializer.deserialize(stream)

        stream = io.BytesIO(archive.read('context.dat'))
        self.context = serializer.deserialize(stream)

        stream = io.BytesIO(archive.read('result.dat'))
        self.result = serializer.deserialize(stream)

        stream = io.BytesIO(archive.read('moderator.dat'))
        self._moderator = serializer.deserialize(stream)
        if self._moderator is not None:
            self._moderator.advice_chain = self.configuration.get_advice_chain(
                self._advice_chain
            )

        self.attachments = Attachments(archive)

    def _store_as_zip(self, stream):
        serializer = self.configuration.serializer

        with zipfile.ZipFile(stream, 'w') as archive:
            session_json = json.dumps(
                {
                    'configuration': self.configuration.module_name,
                    'advice_chain': self._advice_chain,
                    'ids': str(self.ids),
                }
            )
            archive.writestr('session.json', session_json)

            stream = io.BytesIO()
            serializer.serialize(self.task, stream)
            archive.writestr('task.dat', stream.getvalue())

            stream = io.BytesIO()
            serializer.serialize(self.execution, stream)
            archive.writestr('execution.dat', stream.getvalue())

            stream = io.BytesIO()
            serializer.serialize(self.context, stream)
            archive.writestr('context.dat', stream.getvalue())

            stream = io.BytesIO()
            serializer.serialize(self.result, stream)
            archive.writestr('result.dat', stream.getvalue())

            stream = io.BytesIO()
            serializer.serialize(self._moderator, stream)
            archive.writestr('moderator.dat', stream.getvalue())

            self.attachments.store(archive)

distribution_archive property readonly 🔗

The DistributionArchive which can be used when transferring the session.

ids property readonly 🔗

The ids of this session.

run_id property readonly 🔗

The run id of the workflow.

serializer property readonly 🔗

The serializer that can be used for serializing values.

session_id property readonly 🔗

The id of this session as string.

temp_dir property readonly 🔗

Temporary directory where session specific files can be written to.

This directory is meant for storing temporary files, that are used by the individual Advice instances. The directory is already created and will be automatically deleted with the end of the python interpreter, nonetheless, the advices writing files to the directory should if possible take care of removing them if no longer needed.

Returns:

Type Description
pathlib.Path

Path to the temporary directory.

__init__(self, task=None, execution=None, configuration=None, advice_chain='default') special 🔗

Create a new session.

Source code in bandsaw/session.py
def __init__(
    self,
    task=None,
    execution=None,
    configuration=None,
    advice_chain='default',
):
    """
    Create a new session.

    """
    self.task = task
    self.execution = execution
    self.context = {}
    self.result = None
    self.attachments = Attachments()
    self.configuration = configuration
    self._advice_chain = advice_chain
    self._moderator = None
    self._ids = None
    self._temp_dir = None

conclude(self, result) 🔗

Conclude the process of advising with a Result.

This can be used in two cases:

  1. Concluding BEFORE the task was actually executed. This will skip all subsequent advices defined later in the advice chain and will skip the task execution. The given result will then be used as preliminary result. All advices that are defined before the calling advice in the advice chain will still be called with there after(session) method.

  2. Concluding AFTER the task was actually executed. This will just change the result of the session and continue will all following advices.

Parameters:

Name Type Description Default
result bandsaw.result.Result

The result to conclude with.

required
Source code in bandsaw/session.py
def conclude(self, result):
    """
    Conclude the process of advising with a `Result`.

    This can be used in two cases:

    1. Concluding BEFORE the task was actually executed. This will skip all
       subsequent advices defined later in the advice chain and will skip the
       task execution. The given `result` will then be used as preliminary result.
       All advices that are defined before the calling advice in the advice chain
       will still be called with there `after(session)` method.

    2. Concluding AFTER the task was actually executed. This will just change the
       `result` of the session and continue will all following advices.

    Args:
        result (bandsaw.result.Result): The result to conclude with.
    """
    self.result = result
    self._moderator.skip(self)

initiate(self) 🔗

Start the process of advising an execution.

Returns:

Type Description
bandsaw.result.Result

The final result of the execution after all advices.

Source code in bandsaw/session.py
def initiate(self):
    """
    Start the process of advising an execution.

    Returns:
        bandsaw.result.Result: The final result of the execution after all
            advices.
    """

    self._moderator = _Moderator(
        self.configuration.get_advice_chain(self._advice_chain)
    )

    logger.debug("running extensions before advice")
    for extension in self.configuration.extensions:
        extension.on_session_created(self)

    self.proceed()

    if not self._moderator.is_finished:
        raise RuntimeError(
            f"Not all advice has been applied. "
            f"Misbehaving advice {self._moderator.current_advice}"
        )

    logger.debug("running extensions after advice")
    for extension in self.configuration.extensions:
        extension.on_session_finished(self)

    return self.result

proceed(self) 🔗

Continue the process of advising with the next advice.

Source code in bandsaw/session.py
def proceed(self):
    """
    Continue the process of advising with the next advice.
    """
    self._moderator.next(self)

restore(self, stream) 🔗

Resume a prior suspended session.

Source code in bandsaw/session.py
def restore(self, stream):
    """
    Resume a prior suspended session.
    """
    self._load_from_zip(stream)
    return self

save(self, stream) 🔗

Suspend the session to be resumed later or elsewhere.

Source code in bandsaw/session.py
def save(self, stream):
    """
    Suspend the session to be resumed later or elsewhere.
    """
    self._store_as_zip(stream)

tasks 🔗

Contains classes and functions representing different types of tasks

Task (SerializableValue, ABC) 🔗

Base-class for different types of Tasks that can be executed

Attributes:

Name Type Description
task_id str

A unique identifier for the individual tasks.

advice_parameters dict

A dictionary with additional arguments provided at task definition.

source str

The python source code as string which defines the task.

bytecode bytes

The compiled byte code of the task definition.

Source code in bandsaw/tasks.py
class Task(SerializableValue, abc.ABC):
    """Base-class for different types of `Tasks` that can be executed

    Attributes:
        task_id (str): A unique identifier for the individual tasks.
        advice_parameters (dict): A dictionary with additional arguments provided at
            task definition.
        source (str): The python source code as string which defines the task.
        bytecode (bytes): The compiled byte code of the task definition.
    """

    # For different types of callable
    # https://stackoverflow.com/questions/19314405/how-to-detect-is-decorator-has-been-applied-to-method-or-function

    def __init__(self, task_id, advice_parameters):
        self.task_id = task_id
        self._advice_parameters = advice_parameters

    @property
    def advice_parameters(self):
        """Additional parameters for advices defined at task definition."""
        return dict(self._advice_parameters)

    @property
    @abc.abstractmethod
    def source(self):
        """The python source code as `str` which defines the task."""

    @property
    @abc.abstractmethod
    def bytecode(self):
        """The compiled byte code of the task definition as `bytes`."""

    @property
    @abc.abstractmethod
    def signature(self):
        """The signature() of the callable representing this task."""

    @abc.abstractmethod
    def _execute(self, args, kwargs):
        """
        Execute the task with the given arguments.

        Args:
            args: The positional arguments to use during execution.
            kwargs: The keyword arguments to use during execution.

        Returns:
            Any: The returned value from the task.

        Raises:
            Any: During the execution the task can raise arbitrary exceptions.
        """

    def execute(self, execution):
        """
        Execute the task with the arguments specified by the execution.

        Args:
            execution (bandsaw.execution.Execution): The definition which contains how
                the task should be executed.

        Returns:
            bandsaw.result.Result: A `Result` object with either the returned value
                from the task or an exception that was raised by the task.
        """
        try:
            result_value = self._execute(execution.args, execution.kwargs)
            result = Result(value=result_value)
        except Exception as error:  # pylint: disable=W0703 # too general exception
            result = Result(exception=error)
        return result

    @classmethod
    def create_task(cls, obj, advice_parameters=None):
        """
        Factory for creating a task for different Python objects.

        Args:
            obj (Any): Python object that should be run as a task.
            advice_parameters (dict): A dictionary containing additional arguments to
                be used by the advices.

        Returns:
            bandsaw.tasks.Task: Instance of `Task` class that allows to execute the
                task.

        Raises:
            TypeError: If there is no support for this type of python object.
        """
        if advice_parameters is None:
            advice_parameters = {}
        if isinstance(obj, types.FunctionType):
            if '.<locals>.' in obj.__qualname__:
                return _FunctionWithClosureTask(obj, advice_parameters)
            function_name, module_name = object_as_import(obj)
            return _FunctionTask(function_name, module_name, advice_parameters)
        raise TypeError(f"Unsupported task object of type {type(obj)}")

advice_parameters property readonly 🔗

Additional parameters for advices defined at task definition.

bytecode property readonly 🔗

The compiled byte code of the task definition as bytes.

signature property readonly 🔗

The signature() of the callable representing this task.

source property readonly 🔗

The python source code as str which defines the task.

create_task(obj, advice_parameters=None) classmethod 🔗

Factory for creating a task for different Python objects.

Parameters:

Name Type Description Default
obj Any

Python object that should be run as a task.

required
advice_parameters dict

A dictionary containing additional arguments to be used by the advices.

None

Returns:

Type Description
bandsaw.tasks.Task

Instance of Task class that allows to execute the task.

Exceptions:

Type Description
TypeError

If there is no support for this type of python object.

Source code in bandsaw/tasks.py
@classmethod
def create_task(cls, obj, advice_parameters=None):
    """
    Factory for creating a task for different Python objects.

    Args:
        obj (Any): Python object that should be run as a task.
        advice_parameters (dict): A dictionary containing additional arguments to
            be used by the advices.

    Returns:
        bandsaw.tasks.Task: Instance of `Task` class that allows to execute the
            task.

    Raises:
        TypeError: If there is no support for this type of python object.
    """
    if advice_parameters is None:
        advice_parameters = {}
    if isinstance(obj, types.FunctionType):
        if '.<locals>.' in obj.__qualname__:
            return _FunctionWithClosureTask(obj, advice_parameters)
        function_name, module_name = object_as_import(obj)
        return _FunctionTask(function_name, module_name, advice_parameters)
    raise TypeError(f"Unsupported task object of type {type(obj)}")

execute(self, execution) 🔗

Execute the task with the arguments specified by the execution.

Parameters:

Name Type Description Default
execution bandsaw.execution.Execution

The definition which contains how the task should be executed.

required

Returns:

Type Description
bandsaw.result.Result

A Result object with either the returned value from the task or an exception that was raised by the task.

Source code in bandsaw/tasks.py
def execute(self, execution):
    """
    Execute the task with the arguments specified by the execution.

    Args:
        execution (bandsaw.execution.Execution): The definition which contains how
            the task should be executed.

    Returns:
        bandsaw.result.Result: A `Result` object with either the returned value
            from the task or an exception that was raised by the task.
    """
    try:
        result_value = self._execute(execution.args, execution.kwargs)
        result = Result(value=result_value)
    except Exception as error:  # pylint: disable=W0703 # too general exception
        result = Result(exception=error)
    return result

tracking special 🔗

backend 🔗

Interface for tracking backends

Backend 🔗

Base class for backend implementations

Source code in bandsaw/tracking/backend.py
class Backend:
    """Base class for backend implementations"""

    def track_run(self, ids, run_info):
        """
        Track a run

        Args:
            ids (bandsaw.session.Ids): Ids where the run was first used.
            run_info (Dict[str,Any]): A dictionary containing tracking information for
                this run.
        """

    def track_distribution_archive(self, distribution_archive):
        """
        Track a distribution archive.

        Args:
            distribution_archive (bandsaw.distribution.DistributionArchive): The
                archive which should be tracked.
        """

    def track_task(self, ids, task_info):
        """
        Track a task.

        Args:
            ids (bandsaw.session.Ids): Ids where task was first used.
            task_info (Dict[str,Any]): A dictionary containing tracking information
                for a task.
        """

    def track_execution(self, ids, execution_info):
        """
        Track an execution.

        Args:
            ids (bandsaw.session.Ids): Ids where task was first used.
            execution_info (Dict[str,Any]): A dictionary containing tracking
                information for the execution.
        """

    def track_session(self, ids, session_info):
        """
        Track a session.

        Args:
            ids (bandsaw.session.Ids): Ids where task was first used.
            session_info (Dict[str,Any]): A dictionary containing tracking
                information for this session.
        """

    def track_result(self, ids, result_info):
        """
        Track the result of a session.

        Args:
            ids (bandsaw.session.Ids): Ids where task was first used.
            result_info (Dict[str,Any]): A dictionary containing tracking
                information for this result.
        """

    def track_attachments(self, ids, attachments):
        """
        Track the attachments of a session.

        Args:
            ids (bandsaw.session.Ids): Ids where task was first used.
            attachments (bandsaw.session.Attachments): An instance of `Attachments`
                which gives access to the files that were attached to a session.
        """
track_attachments(self, ids, attachments) 🔗

Track the attachments of a session.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
attachments bandsaw.session.Attachments

An instance of Attachments which gives access to the files that were attached to a session.

required
Source code in bandsaw/tracking/backend.py
def track_attachments(self, ids, attachments):
    """
    Track the attachments of a session.

    Args:
        ids (bandsaw.session.Ids): Ids where task was first used.
        attachments (bandsaw.session.Attachments): An instance of `Attachments`
            which gives access to the files that were attached to a session.
    """
track_distribution_archive(self, distribution_archive) 🔗

Track a distribution archive.

Parameters:

Name Type Description Default
distribution_archive bandsaw.distribution.DistributionArchive

The archive which should be tracked.

required
Source code in bandsaw/tracking/backend.py
def track_distribution_archive(self, distribution_archive):
    """
    Track a distribution archive.

    Args:
        distribution_archive (bandsaw.distribution.DistributionArchive): The
            archive which should be tracked.
    """
track_execution(self, ids, execution_info) 🔗

Track an execution.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
execution_info Dict[str,Any]

A dictionary containing tracking information for the execution.

required
Source code in bandsaw/tracking/backend.py
def track_execution(self, ids, execution_info):
    """
    Track an execution.

    Args:
        ids (bandsaw.session.Ids): Ids where task was first used.
        execution_info (Dict[str,Any]): A dictionary containing tracking
            information for the execution.
    """
track_result(self, ids, result_info) 🔗

Track the result of a session.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
result_info Dict[str,Any]

A dictionary containing tracking information for this result.

required
Source code in bandsaw/tracking/backend.py
def track_result(self, ids, result_info):
    """
    Track the result of a session.

    Args:
        ids (bandsaw.session.Ids): Ids where task was first used.
        result_info (Dict[str,Any]): A dictionary containing tracking
            information for this result.
    """
track_run(self, ids, run_info) 🔗

Track a run

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where the run was first used.

required
run_info Dict[str,Any]

A dictionary containing tracking information for this run.

required
Source code in bandsaw/tracking/backend.py
def track_run(self, ids, run_info):
    """
    Track a run

    Args:
        ids (bandsaw.session.Ids): Ids where the run was first used.
        run_info (Dict[str,Any]): A dictionary containing tracking information for
            this run.
    """
track_session(self, ids, session_info) 🔗

Track a session.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
session_info Dict[str,Any]

A dictionary containing tracking information for this session.

required
Source code in bandsaw/tracking/backend.py
def track_session(self, ids, session_info):
    """
    Track a session.

    Args:
        ids (bandsaw.session.Ids): Ids where task was first used.
        session_info (Dict[str,Any]): A dictionary containing tracking
            information for this session.
    """
track_task(self, ids, task_info) 🔗

Track a task.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
task_info Dict[str,Any]

A dictionary containing tracking information for a task.

required
Source code in bandsaw/tracking/backend.py
def track_task(self, ids, task_info):
    """
    Track a task.

    Args:
        ids (bandsaw.session.Ids): Ids where task was first used.
        task_info (Dict[str,Any]): A dictionary containing tracking information
            for a task.
    """

filesystem 🔗

Tracking backend using filesystem

FileSystemBackend (Backend) 🔗

Tracking backend that stores data in the local file system.

Source code in bandsaw/tracking/filesystem.py
class FileSystemBackend(Backend):
    """Tracking backend that stores data in the local file system."""

    def __init__(self, directory):
        """
        Create a new backend.

        Args:
            directory (str): Directory where the tracking data will be stored.
        """
        self.directory = pathlib.Path(directory)
        logger.info("Tracking sessions in directory '%s'", self.directory)
        super().__init__()

    def track_run(self, ids, run_info):
        run_dir = self.directory / 'runs' / ids.run_id
        run_dir.mkdir(parents=True)
        run_info_path = run_dir / 'run-info.json'
        with run_info_path.open('w') as stream:
            json.dump(run_info, stream)

    def track_task(self, ids, task_info):
        task_dir = self.directory / 'tasks' / ids.task_id
        task_dir.mkdir(parents=True, exist_ok=True)
        task_info_path = task_dir / 'task-info.json'
        with task_info_path.open('w') as stream:
            json.dump(task_info, stream)

    def track_execution(self, ids, execution_info):
        execution_dir = self.directory / 'tasks' / ids.task_id / ids.execution_id
        execution_dir.mkdir(parents=True, exist_ok=True)
        execution_info_path = execution_dir / 'execution-info.json'
        with execution_info_path.open('w') as stream:
            json.dump(execution_info, stream)

    def track_session(self, ids, session_info):
        self._store_session_info(ids, session_info)
        self._store_session_for_run(ids)

    def track_result(self, ids, result_info):
        self._store_session_result(ids, result_info)

    def track_attachments(self, ids, attachments):
        self._store_session_attachments(ids, attachments)

    def _store_session_for_run(self, ids):
        run_dir = self.directory / 'runs' / ids.run_id
        run_dir.mkdir(parents=True, exist_ok=True)
        run_session_file = run_dir / str(ids)
        run_session_file.touch()

    def _store_session_info(self, ids, session_info):
        session_dir = (
            self.directory / 'tasks' / ids.task_id / ids.execution_id / ids.run_id
        )
        session_dir.mkdir(parents=True, exist_ok=True)
        session_info_file = session_dir / 'session-info.json'
        with session_info_file.open('w') as stream:
            json.dump(session_info, stream)

    def _store_session_result(self, ids, result_info):
        session_dir = (
            self.directory / 'tasks' / ids.task_id / ids.execution_id / ids.run_id
        )
        session_dir.mkdir(parents=True, exist_ok=True)
        session_info_file = session_dir / 'result-info.json'
        with session_info_file.open('w') as stream:
            json.dump(result_info, stream)

    def _store_session_attachments(self, ids, attachments):
        session_dir = (
            self.directory / 'tasks' / ids.task_id / ids.execution_id / ids.run_id
        )
        attachments_dir = session_dir / 'attachments'
        attachments_dir.mkdir(parents=True)
        for name, attachment in attachments.items():
            attachment_path = attachments_dir / name
            with attachment.open() as input_stream:
                with attachment_path.open('wb') as output_stream:
                    shutil.copyfileobj(input_stream, output_stream)
__init__(self, directory) special 🔗

Create a new backend.

Parameters:

Name Type Description Default
directory str

Directory where the tracking data will be stored.

required
Source code in bandsaw/tracking/filesystem.py
def __init__(self, directory):
    """
    Create a new backend.

    Args:
        directory (str): Directory where the tracking data will be stored.
    """
    self.directory = pathlib.Path(directory)
    logger.info("Tracking sessions in directory '%s'", self.directory)
    super().__init__()
track_attachments(self, ids, attachments) 🔗

Track the attachments of a session.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
attachments bandsaw.session.Attachments

An instance of Attachments which gives access to the files that were attached to a session.

required
Source code in bandsaw/tracking/filesystem.py
def track_attachments(self, ids, attachments):
    self._store_session_attachments(ids, attachments)
track_execution(self, ids, execution_info) 🔗

Track an execution.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
execution_info Dict[str,Any]

A dictionary containing tracking information for the execution.

required
Source code in bandsaw/tracking/filesystem.py
def track_execution(self, ids, execution_info):
    execution_dir = self.directory / 'tasks' / ids.task_id / ids.execution_id
    execution_dir.mkdir(parents=True, exist_ok=True)
    execution_info_path = execution_dir / 'execution-info.json'
    with execution_info_path.open('w') as stream:
        json.dump(execution_info, stream)
track_result(self, ids, result_info) 🔗

Track the result of a session.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
result_info Dict[str,Any]

A dictionary containing tracking information for this result.

required
Source code in bandsaw/tracking/filesystem.py
def track_result(self, ids, result_info):
    self._store_session_result(ids, result_info)
track_run(self, ids, run_info) 🔗

Track a run

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where the run was first used.

required
run_info Dict[str,Any]

A dictionary containing tracking information for this run.

required
Source code in bandsaw/tracking/filesystem.py
def track_run(self, ids, run_info):
    run_dir = self.directory / 'runs' / ids.run_id
    run_dir.mkdir(parents=True)
    run_info_path = run_dir / 'run-info.json'
    with run_info_path.open('w') as stream:
        json.dump(run_info, stream)
track_session(self, ids, session_info) 🔗

Track a session.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
session_info Dict[str,Any]

A dictionary containing tracking information for this session.

required
Source code in bandsaw/tracking/filesystem.py
def track_session(self, ids, session_info):
    self._store_session_info(ids, session_info)
    self._store_session_for_run(ids)
track_task(self, ids, task_info) 🔗

Track a task.

Parameters:

Name Type Description Default
ids bandsaw.session.Ids

Ids where task was first used.

required
task_info Dict[str,Any]

A dictionary containing tracking information for a task.

required
Source code in bandsaw/tracking/filesystem.py
def track_task(self, ids, task_info):
    task_dir = self.directory / 'tasks' / ids.task_id
    task_dir.mkdir(parents=True, exist_ok=True)
    task_info_path = task_dir / 'task-info.json'
    with task_info_path.open('w') as stream:
        json.dump(task_info, stream)

tracker 🔗

Contains Advice that tracks task executions in a local file system.

TrackerExtension (Extension) 🔗

Advice that tracks task executions and their data in the file system.

Attributes:

Name Type Description
_backend bandsaw.tracking.backend.Backend

The backend implementation to use.

Source code in bandsaw/tracking/tracker.py
class TrackerExtension(Extension):
    """
    Advice that tracks task executions and their data in the file system.

    Attributes:
        _backend (bandsaw.tracking.backend.Backend): The backend implementation to use.
    """

    def __init__(self, backend):
        """
        Advice that tracks task executions and their data in the file system.

        Args:
            backend (bandsaw.tracking.backend.Backend): The backend implementation to
                use.

        Raises:
            TypeError: If `backend` does not inherit from `Backend` base class.
            ValueError: If no backend is given.
        """
        if backend is None:
            raise ValueError("Backend must be set.")
        if not isinstance(backend, Backend):
            raise TypeError("`backend` is not of type `Backend`.")
        self._backend = backend
        logger.info("Tracking sessions using backend '%s'", self._backend)
        self._tracked_runs = set()
        self._tracked_tasks = set()
        self._tracked_executions = set()
        self._tracked_sessions = set()
        self._tracked_results = set()
        self._tracked_attachments = set()
        super().__init__()

    def on_session_created(self, session):
        self._track_run(session)
        self._track_task(session)
        self._track_execution(session)
        self._track_session(session)

    def on_session_finished(self, session):
        self._track_result(session)
        self._track_attachments(session)

    def _track_run(self, session):
        if session.run_id not in self._tracked_runs:
            self._backend.track_run(session.ids, {'id': session.run_id})
            self._tracked_runs.add(session.run_id)

    def _track_task(self, session):
        if session.task.task_id not in self._tracked_tasks:
            self._backend.track_task(session.ids, self._create_task_info(session))
            self._tracked_tasks.add(session.task.task_id)

    def _track_execution(self, session):
        combined_id = session.task.task_id + '_' + session.execution.execution_id
        if combined_id not in self._tracked_executions:
            self._backend.track_execution(
                session.ids, self._create_execution_info(session)
            )
            self._tracked_executions.add(combined_id)

    def _track_session(self, session):
        if session.session_id not in self._tracked_sessions:
            self._backend.track_session(session.ids, self._create_session_info(session))
            self._tracked_sessions.add(session.session_id)

    def _track_result(self, session):
        if session.session_id not in self._tracked_results:
            self._backend.track_result(session.ids, self._create_result_info(session))
            self._tracked_results.add(session.session_id)

    def _track_attachments(self, session):
        if session.session_id not in self._tracked_attachments:
            self._backend.track_attachments(session.ids, session.attachments)
            self._tracked_attachments.add(session.session_id)

    @staticmethod
    def _create_run_info(session):
        run_info = {
            'run': {
                'id': session.run_id,
            },
            'configuration': session.configuration.module_name,
            'distribution_archive': {
                'modules': session.distribution_archive.modules,
                'id': None,  # session.distribution_archive.archive_id,
            },
        }
        return run_info

    @staticmethod
    def _create_task_info(session):
        task_info = {
            'task': {
                'id': session.task.task_id,
                'definition': str(session.task),
                'advice_parameters': session.task.advice_parameters,
            },
        }
        return task_info

    def _create_execution_info(self, session):
        def _argument_infos(task, execution):
            """
            The names of the positional and keyword arguments for this task.

            Returns:
                tuple[List[str],Set[str]]: Tuple containing a list with the names of the
                    positional arguments and a set with the names of the keyword
                    arguments.
            """
            signature = task.signature
            bound_args = signature.bind(*execution.args, **execution.kwargs)
            bound_args.apply_defaults()
            all_infos = []
            for name, value in bound_args.arguments.items():
                info = value_info(value)
                info['name'] = name
                all_infos.append(info)
            return all_infos

        execution_info = self._create_task_info(session)
        execution_info['execution'] = {
            'id': session.execution.execution_id,
            'arguments': _argument_infos(session.task, session.execution),
        }
        return execution_info

    def _create_session_info(self, session):
        tracking_info = self._create_execution_info(session)
        tracking_info.update(self._create_run_info(session))
        tracking_info.update(
            {
                'session': {
                    'id': str(session.session_id),
                },
                'task': {
                    'id': session.task.task_id,
                    'definition': str(session.task),
                    'advice_parameters': session.task.advice_parameters,
                },
            }
        )
        return tracking_info

    def _create_result_info(self, session):
        def _result_value_infos(result_value):
            """
            The names of the positional and keyword arguments for this task.

            Returns:
                tuple[List[str],Set[str]]: Tuple containing a list with the names of the
                    positional arguments and a set with the names of the keyword
                    arguments.
            """
            result_value_infos = []
            if isinstance(result_value, dict):
                for name, value in result_value.items():
                    info = value_info(value)
                    info['key'] = name
                    result_value_infos.append(info)
            elif isinstance(result_value, list):
                for index, value in enumerate(result_value):
                    info = value_info(value)
                    info['index'] = index
                    result_value_infos.append(info)
            else:
                info = value_info(result_value)
                result_value_infos = info
            return result_value_infos

        result = session.result
        result_info = self._create_session_info(session)
        result_info['result'] = {}
        if result.exception:
            result_info['result']['exception'] = type(result.exception).__name__
            result_info['result']['message'] = str(result.exception)
        else:
            result_info['result']['value'] = _result_value_infos(result.value)

        return result_info
__init__(self, backend) special 🔗

Advice that tracks task executions and their data in the file system.

Parameters:

Name Type Description Default
backend bandsaw.tracking.backend.Backend

The backend implementation to use.

required

Exceptions:

Type Description
TypeError

If backend does not inherit from Backend base class.

ValueError

If no backend is given.

Source code in bandsaw/tracking/tracker.py
def __init__(self, backend):
    """
    Advice that tracks task executions and their data in the file system.

    Args:
        backend (bandsaw.tracking.backend.Backend): The backend implementation to
            use.

    Raises:
        TypeError: If `backend` does not inherit from `Backend` base class.
        ValueError: If no backend is given.
    """
    if backend is None:
        raise ValueError("Backend must be set.")
    if not isinstance(backend, Backend):
        raise TypeError("`backend` is not of type `Backend`.")
    self._backend = backend
    logger.info("Tracking sessions using backend '%s'", self._backend)
    self._tracked_runs = set()
    self._tracked_tasks = set()
    self._tracked_executions = set()
    self._tracked_sessions = set()
    self._tracked_results = set()
    self._tracked_attachments = set()
    super().__init__()
on_session_created(self, session) 🔗

Called before bandsaw advises a task.

This is called before any advice is applied.

Parameters:

Name Type Description Default
session bandsaw.session.Session

The new session.

required
Source code in bandsaw/tracking/tracker.py
def on_session_created(self, session):
    self._track_run(session)
    self._track_task(session)
    self._track_execution(session)
    self._track_session(session)
on_session_finished(self, session) 🔗

Called after bandsaw advised a task.

This is called after all advices have been applied and the final result is available.

Parameters:

Name Type Description Default
session bandsaw.session.Session

The session.

required
Source code in bandsaw/tracking/tracker.py
def on_session_finished(self, session):
    self._track_result(session)
    self._track_attachments(session)

user 🔗

Contains functions related to users

get_current_username() 🔗

Returns the name of the user which is currently running the python process.

Returns:

Type Description
str

The name of the user on the local system.

Source code in bandsaw/user.py
def get_current_username():
    """
    Returns the name of the user which is currently running the python process.

    Returns:
        str: The name of the user on the local system.
    """
    return pwd.getpwuid(os.getuid())[0]

Last update: 2021-10-16