API
A library for splitting python workflows into separate tasks
advice
🔗
Contains classes and functions for advising tasks.
Advice
🔗
Interface that needs to be implemented by an advice.
The interface is quite simple. One has to implement two different methods,
before(session)
and after(session)
, that are called during the process of
advising a task execution. Both take a single argument session
which contains an
instance of the class Session
. This object allows the individual advices to
influence the task execution by changing the way the task is being called or
making changes to the result.
Source code in bandsaw/advice.py
class Advice:
"""
Interface that needs to be implemented by an advice.
The interface is quite simple. One has to implement two different methods,
`before(session)` and `after(session)`, that are called during the process of
advising a task execution. Both take a single argument `session` which contains an
instance of the class `Session`. This object allows the individual advices to
influence the task execution by changing the way the task is being called or
making changes to the result.
"""
def before(self, session): # pylint: disable=R0201 # no-self-use
"""
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call `session.proceed()`, which will continue the process with the next
advice in the advice chain, or call `session.conclude(result)` with a `Result`
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call `session.proceed()`.
Args:
session (bandsaw.session.Session): The session of the execution.
"""
session.proceed()
def after(self, session): # pylint: disable=R0201 # no-self-use
"""
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the `session`.
In order to continue, the advice MUST either call `session.proceed()`, which
will continue the process with current `result` and the next advice in the
advice chain, or call `session.conclude(result)` with a `Result` instance,
which will set a different result and continue with it.
The default implementation will just call `session.proceed()`.
Args:
session (bandsaw.session.Session): The session of the execution.
"""
session.proceed()
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advice.py
def after(self, session): # pylint: disable=R0201 # no-self-use
"""
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the `session`.
In order to continue, the advice MUST either call `session.proceed()`, which
will continue the process with current `result` and the next advice in the
advice chain, or call `session.conclude(result)` with a `Result` instance,
which will set a different result and continue with it.
The default implementation will just call `session.proceed()`.
Args:
session (bandsaw.session.Session): The session of the execution.
"""
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advice.py
def before(self, session): # pylint: disable=R0201 # no-self-use
"""
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call `session.proceed()`, which will continue the process with the next
advice in the advice chain, or call `session.conclude(result)` with a `Result`
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call `session.proceed()`.
Args:
session (bandsaw.session.Session): The session of the execution.
"""
session.proceed()
advise_task_with_chain(task, execution, configuration, advice_chain='default')
🔗
Executes an Task
with additional advices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
bandsaw.tasks.Task |
The task to be executed. |
required |
execution |
bandsaw.execution.Execution |
The execution definition for the task. |
required |
configuration |
bandsaw.config.Configuration |
The configuration which should be used during advising. |
required |
advice_chain |
str |
The name of the advice chain which contains the additional advices to be applied to the task. Defaults to 'default'. |
'default' |
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
The result of the task execution. |
Source code in bandsaw/advice.py
def advise_task_with_chain(task, execution, configuration, advice_chain='default'):
"""
Executes an `Task` with additional advices.
Args:
task (bandsaw.tasks.Task): The task to be executed.
execution (bandsaw.execution.Execution): The execution definition for the task.
configuration (bandsaw.config.Configuration): The configuration which should
be used during advising.
advice_chain (str): The name of the advice chain which contains the additional
advices to be applied to the task. Defaults to 'default'.
Returns:
bandsaw.result.Result: The result of the task execution.
"""
session = Session(task, execution, configuration, advice_chain)
return session.initiate()
advices
special
🔗
Package that contains reusable Advice
classes
cache
🔗
Contains Advice that can cache task results in a local file system.
CachingAdvice (Advice)
🔗
Advice that caches results in a local filesystem.
Attributes:
Name | Type | Description |
---|---|---|
directory |
Path |
The path to the directory where the results are cached. |
Source code in bandsaw/advices/cache.py
class CachingAdvice(Advice):
"""
Advice that caches results in a local filesystem.
Attributes:
directory (Path): The path to the directory where the results are cached.
"""
def __init__(self, directory):
self.directory = pathlib.Path(directory)
logger.info("Caching artifacts in storage '%s'", self.directory)
super().__init__()
def before(self, session):
artifact_id = session.task.task_id
revision_id = session.execution.execution_id
cache_item_path = self.directory / artifact_id / revision_id
session.context['cache-item-path'] = str(cache_item_path)
if cache_item_path.exists():
logger.info("Using result from cache '%s'", cache_item_path)
with open(cache_item_path, 'rb') as stream:
result = session.serializer.deserialize(stream)
session.conclude(result)
return
session.proceed()
def after(self, session):
cache_item_path = pathlib.Path(session.context['cache-item-path'])
if not cache_item_path.exists():
cache_item_directory = cache_item_path.parent
if not cache_item_directory.exists():
cache_item_directory.mkdir(parents=True)
logger.info("Storing result in cache '%s'", cache_item_path)
with open(cache_item_path, 'wb') as stream:
session.serializer.serialize(session.result, stream)
session.proceed()
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/cache.py
def after(self, session):
cache_item_path = pathlib.Path(session.context['cache-item-path'])
if not cache_item_path.exists():
cache_item_directory = cache_item_path.parent
if not cache_item_directory.exists():
cache_item_directory.mkdir(parents=True)
logger.info("Storing result in cache '%s'", cache_item_path)
with open(cache_item_path, 'wb') as stream:
session.serializer.serialize(session.result, stream)
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/cache.py
def before(self, session):
artifact_id = session.task.task_id
revision_id = session.execution.execution_id
cache_item_path = self.directory / artifact_id / revision_id
session.context['cache-item-path'] = str(cache_item_path)
if cache_item_path.exists():
logger.info("Using result from cache '%s'", cache_item_path)
with open(cache_item_path, 'rb') as stream:
result = session.serializer.deserialize(stream)
session.conclude(result)
return
session.proceed()
log
🔗
Contains an Advice
implementation which adds logging
JsonFormatter (Formatter)
🔗
Formatter that formats log records into a JSON string.
Source code in bandsaw/advices/log.py
class JsonFormatter(logging.Formatter):
"""
Formatter that formats log records into a JSON string.
"""
def format(self, record):
timestamp = datetime.datetime.fromtimestamp(
record.created,
datetime.timezone.utc,
)
log_item = {
"timestamp": datetime.datetime.isoformat(timestamp),
"logger": record.name,
"level": record.levelname,
"message": record.message,
"threadId": record.thread,
"threadName": record.threadName,
"processId": record.process,
"processName": record.processName,
"module": record.module,
"function": record.funcName,
"path": record.pathname,
"line_no": record.lineno,
}
if record.exc_info is not None:
log_item.update(
{
'traceback': ''.join(
traceback.format_tb(record.exc_info[2])
).strip(),
'exception': traceback.format_exception_only(*record.exc_info[:2])[
0
].strip(),
}
)
if hasattr(record, 'session'):
log_item.update(
{
'sessionId': record.session.session_id,
'runId': record.session.run_id,
'taskId': record.session.task.task_id,
'executionId': record.session.execution.execution_id,
}
)
return json.dumps(log_item)
format(self, record)
🔗
Format the specified record as text.
The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
Source code in bandsaw/advices/log.py
def format(self, record):
timestamp = datetime.datetime.fromtimestamp(
record.created,
datetime.timezone.utc,
)
log_item = {
"timestamp": datetime.datetime.isoformat(timestamp),
"logger": record.name,
"level": record.levelname,
"message": record.message,
"threadId": record.thread,
"threadName": record.threadName,
"processId": record.process,
"processName": record.processName,
"module": record.module,
"function": record.funcName,
"path": record.pathname,
"line_no": record.lineno,
}
if record.exc_info is not None:
log_item.update(
{
'traceback': ''.join(
traceback.format_tb(record.exc_info[2])
).strip(),
'exception': traceback.format_exception_only(*record.exc_info[:2])[
0
].strip(),
}
)
if hasattr(record, 'session'):
log_item.update(
{
'sessionId': record.session.session_id,
'runId': record.session.run_id,
'taskId': record.session.task.task_id,
'executionId': record.session.execution.execution_id,
}
)
return json.dumps(log_item)
LoggingAdvice (Advice)
🔗
An Advice which adds additional logging
Source code in bandsaw/advices/log.py
class LoggingAdvice(Advice):
"""An Advice which adds additional logging"""
def __init__(self, level=None, formatter=None):
"""
Create a new instance of the `LoggingAdvice`.
Args:
level (int): The log level of the messages to keep. If `None` the level is
defined by the root logger. Defaults to `None`.
formatter (logging.Formatter): Formatter to use for writing out the
individual log messages. Defaults to `JsonFormatter`.
"""
self._level = level
if formatter is None:
formatter = JsonFormatter()
self._formatter = formatter
def before(self, session):
session_log_file_path = session.temp_dir / 'session.log'
file_handler = logging.FileHandler(
filename=str(session_log_file_path.absolute()),
)
file_handler.set_name('Handler-' + session.session_id)
if self._level is not None:
file_handler.setLevel(self._level)
file_handler.setFormatter(self._formatter)
session_filter = _SessionFilter(session)
file_handler.addFilter(session_filter)
logging.root.addHandler(file_handler)
logger.info(
"BEFORE %s:%s with context %s",
session.task.task_id,
session.execution.execution_id,
session.context,
)
session.proceed()
def after(self, session):
logger.info(
"AFTER %s:%s with context %s",
session.task.task_id,
session.execution.execution_id,
session.context,
)
for handler in logging.root.handlers:
if handler.get_name() == 'Handler-' + session.session_id:
logging.root.removeHandler(handler)
handler.flush()
session.attachments['session.log'] = session.temp_dir / 'session.log'
session.proceed()
__init__(self, level=None, formatter=None)
special
🔗
Create a new instance of the LoggingAdvice
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
level |
int |
The log level of the messages to keep. If |
None |
formatter |
logging.Formatter |
Formatter to use for writing out the
individual log messages. Defaults to |
None |
Source code in bandsaw/advices/log.py
def __init__(self, level=None, formatter=None):
"""
Create a new instance of the `LoggingAdvice`.
Args:
level (int): The log level of the messages to keep. If `None` the level is
defined by the root logger. Defaults to `None`.
formatter (logging.Formatter): Formatter to use for writing out the
individual log messages. Defaults to `JsonFormatter`.
"""
self._level = level
if formatter is None:
formatter = JsonFormatter()
self._formatter = formatter
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/log.py
def after(self, session):
logger.info(
"AFTER %s:%s with context %s",
session.task.task_id,
session.execution.execution_id,
session.context,
)
for handler in logging.root.handlers:
if handler.get_name() == 'Handler-' + session.session_id:
logging.root.removeHandler(handler)
handler.flush()
session.attachments['session.log'] = session.temp_dir / 'session.log'
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/log.py
def before(self, session):
session_log_file_path = session.temp_dir / 'session.log'
file_handler = logging.FileHandler(
filename=str(session_log_file_path.absolute()),
)
file_handler.set_name('Handler-' + session.session_id)
if self._level is not None:
file_handler.setLevel(self._level)
file_handler.setFormatter(self._formatter)
session_filter = _SessionFilter(session)
file_handler.addFilter(session_filter)
logging.root.addHandler(file_handler)
logger.info(
"BEFORE %s:%s with context %s",
session.task.task_id,
session.execution.execution_id,
session.context,
)
session.proceed()
metrics
🔗
Contains an Advice
implementation which gathers metrics.
MetricsAdvice (Advice)
🔗
An Advice which gathers metrics.
Underneath this advice uses the python multimeter
library for collecting the
metrics.
Source code in bandsaw/advices/metrics.py
class MetricsAdvice(Advice):
"""An Advice which gathers metrics.
Underneath this advice uses the python `multimeter` library for collecting the
metrics.
"""
def __init__(self, meter, directory=None, file_format=multimeter.JsonFormat()):
"""
Creates a new MetricsAdvice that gathers metrics.
Args:
meter (multimeter.Multimeter): The Multimeter instance which is used for
gathering the metrics.
directory (str): Path to a directory, where the metrics are temporarily
stored. If `None` or omitted, the session temporary directory is used.
file_format (multimeter.FileFormat): File format that defines the format
in which the gathered metrics are stored. Defaults to
`multimeter.JsonFormat`.
"""
self._multimeter = meter
if directory is not None:
self._directory = pathlib.Path(directory)
else:
self._directory = None
self._file_format = file_format
def before(self, session):
tags = {
'run_id': session.run_id,
'task_id': session.task.task_id,
'execution_id': session.execution.execution_id,
'session_id': session.session_id,
}
advice_parameters = session.task.advice_parameters
additional_tags = advice_parameters.get('metrics', {}).get('tags', {})
tags.update(additional_tags)
logger.info("Measurement id %s with tags %s", session.session_id, tags)
measurement = self._multimeter.measure(session.session_id, **tags)
session.context['metrics.measurement'] = measurement
logger.debug("Measurement start")
measurement.start()
session.proceed()
def after(self, session):
measurement = session.context.pop('metrics.measurement')
logger.debug("Measurement end")
measurement.end()
directory = self._directory or session.temp_dir
storage = multimeter.FileStorage(directory, self._file_format)
storage.store(measurement.result)
metrics_file_name = measurement.identifier + self._file_format.extension
metrics_attachment_name = 'metrics' + self._file_format.extension
session.attachments[metrics_attachment_name] = directory / metrics_file_name
session.proceed()
__init__(self, meter, directory=None, file_format=<multimeter.storages.file.JsonFormat object at 0x7f8a8f98df10>)
special
🔗
Creates a new MetricsAdvice that gathers metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
meter |
multimeter.Multimeter |
The Multimeter instance which is used for gathering the metrics. |
required |
directory |
str |
Path to a directory, where the metrics are temporarily
stored. If |
None |
file_format |
multimeter.FileFormat |
File format that defines the format
in which the gathered metrics are stored. Defaults to
|
<multimeter.storages.file.JsonFormat object at 0x7f8a8f98df10> |
Source code in bandsaw/advices/metrics.py
def __init__(self, meter, directory=None, file_format=multimeter.JsonFormat()):
"""
Creates a new MetricsAdvice that gathers metrics.
Args:
meter (multimeter.Multimeter): The Multimeter instance which is used for
gathering the metrics.
directory (str): Path to a directory, where the metrics are temporarily
stored. If `None` or omitted, the session temporary directory is used.
file_format (multimeter.FileFormat): File format that defines the format
in which the gathered metrics are stored. Defaults to
`multimeter.JsonFormat`.
"""
self._multimeter = meter
if directory is not None:
self._directory = pathlib.Path(directory)
else:
self._directory = None
self._file_format = file_format
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/metrics.py
def after(self, session):
measurement = session.context.pop('metrics.measurement')
logger.debug("Measurement end")
measurement.end()
directory = self._directory or session.temp_dir
storage = multimeter.FileStorage(directory, self._file_format)
storage.store(measurement.result)
metrics_file_name = measurement.identifier + self._file_format.extension
metrics_attachment_name = 'metrics' + self._file_format.extension
session.attachments[metrics_attachment_name] = directory / metrics_file_name
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/metrics.py
def before(self, session):
tags = {
'run_id': session.run_id,
'task_id': session.task.task_id,
'execution_id': session.execution.execution_id,
'session_id': session.session_id,
}
advice_parameters = session.task.advice_parameters
additional_tags = advice_parameters.get('metrics', {}).get('tags', {})
tags.update(additional_tags)
logger.info("Measurement id %s with tags %s", session.session_id, tags)
measurement = self._multimeter.measure(session.session_id, **tags)
session.context['metrics.measurement'] = measurement
logger.debug("Measurement start")
measurement.start()
session.proceed()
ssh
🔗
Contains code for running tasks remotely via SSH
Remote
🔗
Definition of a remote machine.
Remotes are used for executing sessions on remote machines.
Attributes:
Name | Type | Description |
---|---|---|
host |
str |
The hostname of the machine, where this interpreter is run. |
port |
int |
The port where ssh is listening for connections. Defaults to 22. |
user |
str |
The name of the user, under which the interpreter will run. Defaults to the name of the local user. |
key_file |
str |
Path to a file containing the key to use for authentication. |
interpreter |
bandsaw.interpreter.Interpreter |
The interpreter which should be used, including its executable and python path. |
directory |
str |
Remote directory where temporary files are stored. If |
Source code in bandsaw/advices/ssh.py
class Remote: # pylint: disable=too-few-public-methods
"""
Definition of a remote machine.
Remotes are used for executing sessions on remote machines.
Attributes:
host (str): The hostname of the machine, where this interpreter is run.
port (int): The port where ssh is listening for connections. Defaults to 22.
user (str): The name of the user, under which the interpreter will run.
Defaults to the name of the local user.
key_file (str): Path to a file containing the key to use for authentication.
interpreter (bandsaw.interpreter.Interpreter): The interpreter which should be
used, including its executable and python path.
directory (str): Remote directory where temporary files are stored. If `None`,
defaults to '/tmp'.
"""
def __init__(
self,
host=None,
port=None,
user=None,
key_file=None,
interpreter=None,
directory=None,
): # pylint: disable=too-many-arguments
if host is None:
raise ValueError("Remote needs a host, `None` given.")
self.host = host
self.port = port or 22
self.user = user or get_current_username()
self.key_file = key_file
if interpreter is None:
raise ValueError("Remote needs an interpreter, `None` given.")
self.interpreter = interpreter
self.directory = pathlib.Path(directory or '/tmp')
SshAdvice (Advice)
🔗
Advice that moves and proceeds a session on a remote machine via SSH
Source code in bandsaw/advices/ssh.py
class SshAdvice(Advice):
"""Advice that moves and proceeds a session on a remote machine via SSH"""
def __init__(self, directory=None, backend=SshCommandLineBackend()):
"""
Create a new instance.
Args:
directory (str): The local directory where temporary files are stored to
exchange data between the local and the remote machine. If `None`, the
temporary directory from the session is used.
"""
if directory is None:
self.directory = None
logger.info("Using session temporary directory for exchange data")
else:
self.directory = pathlib.Path(directory)
logger.info("Using directory %s for exchange data", self.directory)
self.remotes = {}
self._backend = backend
super().__init__()
def add_remote(self, remote, name='default'):
"""
Add a new definition of a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Definition of the remote machine.
name (str): Name of the remote. Defaults to `default`.
Returns:
bandsaw.advices.ssh.SshAdvice: The advice with the added remote.
"""
self.remotes[name] = remote
return self
def before(self, session):
temp_dir = self.directory or session.temp_dir
session_in_path = temp_dir / f'session-{session.session_id}-in.zip'
session_out_path = temp_dir / f'session-{session.session_id}-out.zip'
logger.info("Writing session to %s", session_in_path)
with io.FileIO(str(session_in_path), mode='w') as stream:
session.save(stream)
parameters = session.task.advice_parameters
remote_name = parameters.get('ssh', {}).get('remote', 'default')
remote = self.remotes[remote_name]
remote_run_directory = remote.directory / session.execution.execution_id
logger.info(
"Creating run directory %s on host %s",
remote_run_directory,
remote.host,
)
self._backend.create_dir(
remote,
remote_run_directory,
)
logger.info("Copying over distribution archive to host %s", remote.host)
distribution_archive_path = session.distribution_archive.path
remote_distribution_archive_path = (
remote_run_directory / distribution_archive_path.name
)
self._backend.copy_file_to_remote(
remote,
distribution_archive_path,
remote_distribution_archive_path,
)
logger.info("Copying over session to host %s", remote.host)
remote_session_in_path = remote_run_directory / session_in_path.name
self._backend.copy_file_to_remote(
remote,
session_in_path,
remote_session_in_path,
)
remote_session_out_path = remote_run_directory / session_out_path.name
logger.info(
"Running remote process using interpreter %s", remote.interpreter.executable
)
self._backend.execute_remote(
remote,
remote.interpreter.executable,
str(remote_distribution_archive_path),
'--input',
str(remote_session_in_path),
'--output',
str(remote_session_out_path),
'--run-id',
session.run_id,
)
# environment = self.interpreter.environment
# environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
logger.info("Remote process exited")
logger.info("Copying over session result from host %s", remote.host)
self._backend.copy_file_from_remote(
remote,
remote_session_out_path,
session_out_path,
)
logger.info(
"Cleaning up remote directory %s on host %s",
remote_run_directory,
remote.host,
)
self._backend.delete_dir(remote, remote_run_directory)
logger.info("Restore local session from %s", session_out_path)
stream = io.FileIO(str(session_out_path), mode='r')
session.restore(stream)
logger.info(
"Cleaning up local sessions %s, %s",
session_in_path,
session_out_path,
)
session_in_path.unlink()
session_out_path.unlink()
logger.info("Proceed local session")
session.proceed()
def after(self, session):
logger.info("after called in process %d", os.getpid())
logger.info("Remote process created result %s", session.result)
logger.info("Returning to end remote session and continue in parent")
__init__(self, directory=None, backend=<bandsaw.advices.ssh.SshCommandLineBackend object at 0x7f8a8f968250>)
special
🔗
Create a new instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str |
The local directory where temporary files are stored to
exchange data between the local and the remote machine. If |
None |
Source code in bandsaw/advices/ssh.py
def __init__(self, directory=None, backend=SshCommandLineBackend()):
"""
Create a new instance.
Args:
directory (str): The local directory where temporary files are stored to
exchange data between the local and the remote machine. If `None`, the
temporary directory from the session is used.
"""
if directory is None:
self.directory = None
logger.info("Using session temporary directory for exchange data")
else:
self.directory = pathlib.Path(directory)
logger.info("Using directory %s for exchange data", self.directory)
self.remotes = {}
self._backend = backend
super().__init__()
add_remote(self, remote, name='default')
🔗
Add a new definition of a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Definition of the remote machine. |
required |
name |
str |
Name of the remote. Defaults to |
'default' |
Returns:
Type | Description |
---|---|
bandsaw.advices.ssh.SshAdvice |
The advice with the added remote. |
Source code in bandsaw/advices/ssh.py
def add_remote(self, remote, name='default'):
"""
Add a new definition of a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Definition of the remote machine.
name (str): Name of the remote. Defaults to `default`.
Returns:
bandsaw.advices.ssh.SshAdvice: The advice with the added remote.
"""
self.remotes[name] = remote
return self
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/ssh.py
def after(self, session):
logger.info("after called in process %d", os.getpid())
logger.info("Remote process created result %s", session.result)
logger.info("Returning to end remote session and continue in parent")
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/ssh.py
def before(self, session):
temp_dir = self.directory or session.temp_dir
session_in_path = temp_dir / f'session-{session.session_id}-in.zip'
session_out_path = temp_dir / f'session-{session.session_id}-out.zip'
logger.info("Writing session to %s", session_in_path)
with io.FileIO(str(session_in_path), mode='w') as stream:
session.save(stream)
parameters = session.task.advice_parameters
remote_name = parameters.get('ssh', {}).get('remote', 'default')
remote = self.remotes[remote_name]
remote_run_directory = remote.directory / session.execution.execution_id
logger.info(
"Creating run directory %s on host %s",
remote_run_directory,
remote.host,
)
self._backend.create_dir(
remote,
remote_run_directory,
)
logger.info("Copying over distribution archive to host %s", remote.host)
distribution_archive_path = session.distribution_archive.path
remote_distribution_archive_path = (
remote_run_directory / distribution_archive_path.name
)
self._backend.copy_file_to_remote(
remote,
distribution_archive_path,
remote_distribution_archive_path,
)
logger.info("Copying over session to host %s", remote.host)
remote_session_in_path = remote_run_directory / session_in_path.name
self._backend.copy_file_to_remote(
remote,
session_in_path,
remote_session_in_path,
)
remote_session_out_path = remote_run_directory / session_out_path.name
logger.info(
"Running remote process using interpreter %s", remote.interpreter.executable
)
self._backend.execute_remote(
remote,
remote.interpreter.executable,
str(remote_distribution_archive_path),
'--input',
str(remote_session_in_path),
'--output',
str(remote_session_out_path),
'--run-id',
session.run_id,
)
# environment = self.interpreter.environment
# environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
logger.info("Remote process exited")
logger.info("Copying over session result from host %s", remote.host)
self._backend.copy_file_from_remote(
remote,
remote_session_out_path,
session_out_path,
)
logger.info(
"Cleaning up remote directory %s on host %s",
remote_run_directory,
remote.host,
)
self._backend.delete_dir(remote, remote_run_directory)
logger.info("Restore local session from %s", session_out_path)
stream = io.FileIO(str(session_out_path), mode='r')
session.restore(stream)
logger.info(
"Cleaning up local sessions %s, %s",
session_in_path,
session_out_path,
)
session_in_path.unlink()
session_out_path.unlink()
logger.info("Proceed local session")
session.proceed()
SshBackend (ABC)
🔗
Interface definition for different SSH backends.
Source code in bandsaw/advices/ssh.py
class SshBackend(abc.ABC):
"""
Interface definition for different SSH backends.
"""
def create_dir(self, remote, remote_path):
"""
Create a directory on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
be created.
remote_path (str): Remote path to the directory that should be created.
"""
def copy_file_to_remote(self, remote, local_path, remote_path):
"""
Copies a local file or directory to a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine to which a file should
be copied.
local_path (pathlib.Path): Local path to the file which should be copied
over.
remote_path (pathlib.Path): Remote path of the file where it should be
copied to.
"""
def copy_file_from_remote(self, remote, remote_path, local_path):
"""
Copies a remote file or directory to the local file system.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine from which a file
should be copied.
remote_path (pathlib.Path): Remote path of the file which should be
copied.
local_path (pathlib.Path): Local path to the file where it should be
copied to.
"""
def execute_remote(self, remote, executable, *arguments):
"""
Executes an executable on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where `executable`
will be executed.
executable (str): Remote path of the executable which should be executed.
*arguments (str): Positional arguments which are the command line
parameter for the `executable`.
Raises:
subprocess.CalledProcessError: If the remote process ends with an error.
Its return code will be available through the exception.
"""
def delete_dir(self, remote, remote_path):
"""
Delete a directory on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
be deleted.
remote_path (str): Remote path to the directory that should be deleted.
"""
copy_file_from_remote(self, remote, remote_path, local_path)
🔗
Copies a remote file or directory to the local file system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine from which a file should be copied. |
required |
remote_path |
pathlib.Path |
Remote path of the file which should be copied. |
required |
local_path |
pathlib.Path |
Local path to the file where it should be copied to. |
required |
Source code in bandsaw/advices/ssh.py
def copy_file_from_remote(self, remote, remote_path, local_path):
"""
Copies a remote file or directory to the local file system.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine from which a file
should be copied.
remote_path (pathlib.Path): Remote path of the file which should be
copied.
local_path (pathlib.Path): Local path to the file where it should be
copied to.
"""
copy_file_to_remote(self, remote, local_path, remote_path)
🔗
Copies a local file or directory to a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine to which a file should be copied. |
required |
local_path |
pathlib.Path |
Local path to the file which should be copied over. |
required |
remote_path |
pathlib.Path |
Remote path of the file where it should be copied to. |
required |
Source code in bandsaw/advices/ssh.py
def copy_file_to_remote(self, remote, local_path, remote_path):
"""
Copies a local file or directory to a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine to which a file should
be copied.
local_path (pathlib.Path): Local path to the file which should be copied
over.
remote_path (pathlib.Path): Remote path of the file where it should be
copied to.
"""
create_dir(self, remote, remote_path)
🔗
Create a directory on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where a directory will be created. |
required |
remote_path |
str |
Remote path to the directory that should be created. |
required |
Source code in bandsaw/advices/ssh.py
def create_dir(self, remote, remote_path):
"""
Create a directory on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
be created.
remote_path (str): Remote path to the directory that should be created.
"""
delete_dir(self, remote, remote_path)
🔗
Delete a directory on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where a directory will be deleted. |
required |
remote_path |
str |
Remote path to the directory that should be deleted. |
required |
Source code in bandsaw/advices/ssh.py
def delete_dir(self, remote, remote_path):
"""
Delete a directory on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where a directory will
be deleted.
remote_path (str): Remote path to the directory that should be deleted.
"""
execute_remote(self, remote, executable, *arguments)
🔗
Executes an executable on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where |
required |
executable |
str |
Remote path of the executable which should be executed. |
required |
*arguments |
str |
Positional arguments which are the command line
parameter for the |
() |
Exceptions:
Type | Description |
---|---|
subprocess.CalledProcessError |
If the remote process ends with an error. Its return code will be available through the exception. |
Source code in bandsaw/advices/ssh.py
def execute_remote(self, remote, executable, *arguments):
"""
Executes an executable on a remote machine.
Args:
remote (bandsaw.advices.ssh.Remote): Remote machine where `executable`
will be executed.
executable (str): Remote path of the executable which should be executed.
*arguments (str): Positional arguments which are the command line
parameter for the `executable`.
Raises:
subprocess.CalledProcessError: If the remote process ends with an error.
Its return code will be available through the exception.
"""
SshCommandLineBackend (SshBackend)
🔗
SSH backend that uses the SSH command line tools.
Source code in bandsaw/advices/ssh.py
class SshCommandLineBackend(SshBackend):
"""
SSH backend that uses the SSH command line tools.
"""
def create_dir(self, remote, remote_path):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
'mkdir',
'-p',
str(remote_path),
]
)
def copy_file_to_remote(self, remote, local_path, remote_path):
copy_destination = self.get_remote_path(remote, remote_path)
self._run(
[
'scp',
'-P',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
str(local_path),
copy_destination,
],
)
def copy_file_from_remote(self, remote, remote_path, local_path):
copy_source = self.get_remote_path(remote, remote_path)
self._run(
[
'scp',
'-P',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
copy_source,
str(local_path),
],
)
def execute_remote(self, remote, executable, *arguments):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
str(executable),
]
+ list(arguments),
)
def delete_dir(self, remote, remote_path):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
'rm',
'-Rf',
str(remote_path),
]
)
@staticmethod
def _run(command):
logger.debug("running command %s", command)
subprocess.check_call(command)
@staticmethod
def login(remote):
"""Returns the destination of the remote in form of <user>@<host>"""
return f"{remote.user}@{remote.host}"
def get_remote_path(self, remote, path):
"""Returns the remote path in form of <user>@<host>:<path>"""
return f"{self.login(remote)}:{path.absolute()}"
@staticmethod
def _key_file_option(remote):
if remote.key_file is not None:
return [
'-i',
remote.key_file,
]
return []
copy_file_from_remote(self, remote, remote_path, local_path)
🔗
Copies a remote file or directory to the local file system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine from which a file should be copied. |
required |
remote_path |
pathlib.Path |
Remote path of the file which should be copied. |
required |
local_path |
pathlib.Path |
Local path to the file where it should be copied to. |
required |
Source code in bandsaw/advices/ssh.py
def copy_file_from_remote(self, remote, remote_path, local_path):
copy_source = self.get_remote_path(remote, remote_path)
self._run(
[
'scp',
'-P',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
copy_source,
str(local_path),
],
)
copy_file_to_remote(self, remote, local_path, remote_path)
🔗
Copies a local file or directory to a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine to which a file should be copied. |
required |
local_path |
pathlib.Path |
Local path to the file which should be copied over. |
required |
remote_path |
pathlib.Path |
Remote path of the file where it should be copied to. |
required |
Source code in bandsaw/advices/ssh.py
def copy_file_to_remote(self, remote, local_path, remote_path):
copy_destination = self.get_remote_path(remote, remote_path)
self._run(
[
'scp',
'-P',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
str(local_path),
copy_destination,
],
)
create_dir(self, remote, remote_path)
🔗
Create a directory on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where a directory will be created. |
required |
remote_path |
str |
Remote path to the directory that should be created. |
required |
Source code in bandsaw/advices/ssh.py
def create_dir(self, remote, remote_path):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
'mkdir',
'-p',
str(remote_path),
]
)
delete_dir(self, remote, remote_path)
🔗
Delete a directory on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where a directory will be deleted. |
required |
remote_path |
str |
Remote path to the directory that should be deleted. |
required |
Source code in bandsaw/advices/ssh.py
def delete_dir(self, remote, remote_path):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
'rm',
'-Rf',
str(remote_path),
]
)
execute_remote(self, remote, executable, *arguments)
🔗
Executes an executable on a remote machine.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remote |
bandsaw.advices.ssh.Remote |
Remote machine where |
required |
executable |
str |
Remote path of the executable which should be executed. |
required |
*arguments |
str |
Positional arguments which are the command line
parameter for the |
() |
Exceptions:
Type | Description |
---|---|
subprocess.CalledProcessError |
If the remote process ends with an error. Its return code will be available through the exception. |
Source code in bandsaw/advices/ssh.py
def execute_remote(self, remote, executable, *arguments):
return self._run(
[
'ssh',
'-p',
str(remote.port),
]
+ self._key_file_option(remote)
+ [
self.login(remote),
str(executable),
]
+ list(arguments),
)
get_remote_path(self, remote, path)
🔗
Returns the remote path in form of
Source code in bandsaw/advices/ssh.py
def get_remote_path(self, remote, path):
"""Returns the remote path in form of <user>@<host>:<path>"""
return f"{self.login(remote)}:{path.absolute()}"
login(remote)
staticmethod
🔗
Returns the destination of the remote in form of
Source code in bandsaw/advices/ssh.py
@staticmethod
def login(remote):
"""Returns the destination of the remote in form of <user>@<host>"""
return f"{remote.user}@{remote.host}"
subprocess
🔗
Contains Advice implementation that runs the execution in a subprocess
SubprocessAdvice (Advice)
🔗
Advice that runs in a subprocess
Source code in bandsaw/advices/subprocess.py
class SubprocessAdvice(Advice):
"""Advice that runs in a subprocess"""
def __init__(self, directory=None, interpreter=None):
"""
Create a new instance.
Args:
directory (str): The directory where temporary files are stored to
exchange data between both processes. If `None` a temporary directory
is used.
interpreter (bandsaw.interpreter.Interpreter): The interpreter to use in
the subprocess. If `None` the same interpreter will be used.
"""
if directory is None:
self.directory = None
logger.info("Using temporary session directory")
else:
self.directory = pathlib.Path(directory)
logger.info("Using directory %s", self.directory)
self.interpreter = interpreter or Interpreter()
super().__init__()
def before(self, session):
logger.info("before called in process %d", os.getpid())
temp_dir = self.directory or session.temp_dir
session_in_path = temp_dir / f'session-{session.session_id}-in.zip'
session_out_path = temp_dir / f'session-{session.session_id}-out.zip'
archive_path = session.distribution_archive.path
logger.info("Writing session to %s", session_in_path)
with io.FileIO(session_in_path, mode='w') as stream:
session.save(stream)
logger.info(
"Continue session in subprocess using interpreter %s and "
"distribution archive %s",
self.interpreter.executable,
archive_path,
)
environment = self.interpreter.environment
environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
subprocess.check_call(
[
self.interpreter.executable,
archive_path,
'--input',
session_in_path,
'--output',
session_out_path,
'--run-id',
session.run_id,
],
env=environment,
)
logger.info("Sub process exited")
logger.info("Reading session from %s", session_out_path)
with io.FileIO(session_out_path, mode='r') as stream:
session.restore(stream)
logger.info(
"Cleaning up session files %s, %s",
session_in_path,
session_out_path,
)
session_in_path.unlink()
session_out_path.unlink()
logger.info("proceed() session in parent process")
session.proceed()
def after(self, session):
logger.info("after called in process %d", os.getpid())
logger.info("Sub process created result %s", session.result)
logger.info("Returning to end session and continue in parent")
__init__(self, directory=None, interpreter=None)
special
🔗
Create a new instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str |
The directory where temporary files are stored to
exchange data between both processes. If |
None |
interpreter |
bandsaw.interpreter.Interpreter |
The interpreter to use in
the subprocess. If |
None |
Source code in bandsaw/advices/subprocess.py
def __init__(self, directory=None, interpreter=None):
"""
Create a new instance.
Args:
directory (str): The directory where temporary files are stored to
exchange data between both processes. If `None` a temporary directory
is used.
interpreter (bandsaw.interpreter.Interpreter): The interpreter to use in
the subprocess. If `None` the same interpreter will be used.
"""
if directory is None:
self.directory = None
logger.info("Using temporary session directory")
else:
self.directory = pathlib.Path(directory)
logger.info("Using directory %s", self.directory)
self.interpreter = interpreter or Interpreter()
super().__init__()
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/subprocess.py
def after(self, session):
logger.info("after called in process %d", os.getpid())
logger.info("Sub process created result %s", session.result)
logger.info("Returning to end session and continue in parent")
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/subprocess.py
def before(self, session):
logger.info("before called in process %d", os.getpid())
temp_dir = self.directory or session.temp_dir
session_in_path = temp_dir / f'session-{session.session_id}-in.zip'
session_out_path = temp_dir / f'session-{session.session_id}-out.zip'
archive_path = session.distribution_archive.path
logger.info("Writing session to %s", session_in_path)
with io.FileIO(session_in_path, mode='w') as stream:
session.save(stream)
logger.info(
"Continue session in subprocess using interpreter %s and "
"distribution archive %s",
self.interpreter.executable,
archive_path,
)
environment = self.interpreter.environment
environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
subprocess.check_call(
[
self.interpreter.executable,
archive_path,
'--input',
session_in_path,
'--output',
session_out_path,
'--run-id',
session.run_id,
],
env=environment,
)
logger.info("Sub process exited")
logger.info("Reading session from %s", session_out_path)
with io.FileIO(session_out_path, mode='r') as stream:
session.restore(stream)
logger.info(
"Cleaning up session files %s, %s",
session_in_path,
session_out_path,
)
session_in_path.unlink()
session_out_path.unlink()
logger.info("proceed() session in parent process")
session.proceed()
config
🔗
Contains the class and functions to configure bandsaw.
Configuration
🔗
Class that represents a configuration for bandsaw.
Attributes:
Name | Type | Description |
---|---|---|
temporary_directory |
pathlib.Path |
The path to a directory where temporary files are stored. |
Source code in bandsaw/config.py
class Configuration:
"""
Class that represents a configuration for bandsaw.
Attributes:
temporary_directory (pathlib.Path): The path to a directory where temporary
files are stored.
"""
def __init__(self):
self._advice_chains = {}
self.extensions = []
self.serializer = PickleSerializer()
self.add_advice_chain()
stack = traceback.extract_stack(limit=2)
config_module_file_path = stack[0].filename
self.module_name = get_loaded_module_name_by_path(config_module_file_path)
logger.info("Config created in module: %s", self.module_name)
self.distribution_modules = []
self.temporary_directory = None
self.set_temp_directory(tempfile.mkdtemp(prefix='bandsaw'))
def add_advice_chain(self, *advices, name='default'):
"""
Add a new advice chain to the configuration.
Each advice chain has a unique `name`. If multiple chains with the same name
are added to the configuration, the last chain overwrites all previous chains.
Args:
*advices (bandsaw.advice.Advice): A tuple of advices for this chain.
name (str): The name of the advice chain, defaults to 'default' if not
specified.
Returns:
bandsaw.config.Configuration: The configuration to which the chain was
added.
"""
self._advice_chains[name] = advices
return self
def get_advice_chain(self, name):
"""
Returns the advice chain with the given name.
Args:
name (str): Name of the wanted advice chain.
Returns:
List[bandsaw.advice.Advice]: The advice chain with the given name.
Raises:
KeyError: If no chain with the specified name is configured.
"""
return self._advice_chains.get(name)
def add_extension(self, extension):
"""
Add an `Extension` to the configuration.
`Extensions` are objects that can implement callbacks to be informed by
bandsaw about certain conditions, e.g. the creation of new tasks or the final
result of an execution.
Args:
extension (bandsaw.extension.Extension): An object implementing the
`Extension`.
Returns:
bandsaw.config.Configuration: The configuration to which the extension was
added.
"""
self.extensions.append(extension)
return self
def set_serializer(self, serializer):
"""
Sets the serialize which defines how tasks and results will be serialized.
Args:
serializer (bandsaw.serialization.Serializer): The serializer to use for
serializing objects.
Returns:
bandsaw.config.Configuration: The configuration to which the extension was
added.
"""
self.serializer = serializer
return self
def add_modules_for_distribution(self, *modules):
"""
Add modules that should be included in the distribution archive.
Args:
*modules (List[str]): Positional arguments with strings, that contain the
names of modules, which should be included in the distribution
archive.
Returns:
bandsaw.config.Configuration: The configuration with the added modules.
"""
self.distribution_modules.extend(modules)
return self
def set_temp_directory(self, directory):
"""
Sets the temporary directory.
Args:
directory (Union[str, pathlib.Path]): Path to the directory, where
temporary files will be stored.
"""
self.temporary_directory = pathlib.Path(directory)
atexit.register(
lambda path: shutil.rmtree(path, ignore_errors=True),
str(self.temporary_directory),
)
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.module_name == other.module_name
def __hash__(self):
return hash(self.module_name)
add_advice_chain(self, *advices, *, name='default')
🔗
Add a new advice chain to the configuration.
Each advice chain has a unique name
. If multiple chains with the same name
are added to the configuration, the last chain overwrites all previous chains.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*advices |
bandsaw.advice.Advice |
A tuple of advices for this chain. |
() |
name |
str |
The name of the advice chain, defaults to 'default' if not specified. |
'default' |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the chain was added. |
Source code in bandsaw/config.py
def add_advice_chain(self, *advices, name='default'):
"""
Add a new advice chain to the configuration.
Each advice chain has a unique `name`. If multiple chains with the same name
are added to the configuration, the last chain overwrites all previous chains.
Args:
*advices (bandsaw.advice.Advice): A tuple of advices for this chain.
name (str): The name of the advice chain, defaults to 'default' if not
specified.
Returns:
bandsaw.config.Configuration: The configuration to which the chain was
added.
"""
self._advice_chains[name] = advices
return self
add_extension(self, extension)
🔗
Add an Extension
to the configuration.
Extensions
are objects that can implement callbacks to be informed by
bandsaw about certain conditions, e.g. the creation of new tasks or the final
result of an execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
extension |
bandsaw.extension.Extension |
An object implementing the
|
required |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the extension was added. |
Source code in bandsaw/config.py
def add_extension(self, extension):
"""
Add an `Extension` to the configuration.
`Extensions` are objects that can implement callbacks to be informed by
bandsaw about certain conditions, e.g. the creation of new tasks or the final
result of an execution.
Args:
extension (bandsaw.extension.Extension): An object implementing the
`Extension`.
Returns:
bandsaw.config.Configuration: The configuration to which the extension was
added.
"""
self.extensions.append(extension)
return self
add_modules_for_distribution(self, *modules)
🔗
Add modules that should be included in the distribution archive.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*modules |
List[str] |
Positional arguments with strings, that contain the names of modules, which should be included in the distribution archive. |
() |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration with the added modules. |
Source code in bandsaw/config.py
def add_modules_for_distribution(self, *modules):
"""
Add modules that should be included in the distribution archive.
Args:
*modules (List[str]): Positional arguments with strings, that contain the
names of modules, which should be included in the distribution
archive.
Returns:
bandsaw.config.Configuration: The configuration with the added modules.
"""
self.distribution_modules.extend(modules)
return self
get_advice_chain(self, name)
🔗
Returns the advice chain with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the wanted advice chain. |
required |
Returns:
Type | Description |
---|---|
List[bandsaw.advice.Advice] |
The advice chain with the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If no chain with the specified name is configured. |
Source code in bandsaw/config.py
def get_advice_chain(self, name):
"""
Returns the advice chain with the given name.
Args:
name (str): Name of the wanted advice chain.
Returns:
List[bandsaw.advice.Advice]: The advice chain with the given name.
Raises:
KeyError: If no chain with the specified name is configured.
"""
return self._advice_chains.get(name)
set_serializer(self, serializer)
🔗
Sets the serialize which defines how tasks and results will be serialized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
serializer |
bandsaw.serialization.Serializer |
The serializer to use for serializing objects. |
required |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the extension was added. |
Source code in bandsaw/config.py
def set_serializer(self, serializer):
"""
Sets the serialize which defines how tasks and results will be serialized.
Args:
serializer (bandsaw.serialization.Serializer): The serializer to use for
serializing objects.
Returns:
bandsaw.config.Configuration: The configuration to which the extension was
added.
"""
self.serializer = serializer
return self
set_temp_directory(self, directory)
🔗
Sets the temporary directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
Union[str, pathlib.Path] |
Path to the directory, where temporary files will be stored. |
required |
Source code in bandsaw/config.py
def set_temp_directory(self, directory):
"""
Sets the temporary directory.
Args:
directory (Union[str, pathlib.Path]): Path to the directory, where
temporary files will be stored.
"""
self.temporary_directory = pathlib.Path(directory)
atexit.register(
lambda path: shutil.rmtree(path, ignore_errors=True),
str(self.temporary_directory),
)
get_configuration(configuration_module=None)
🔗
Return a configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration_module |
str |
The module name of a module, which contains the
configuration. The module needs to define a member 'configuration', which
contains an instance of |
None |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration. |
Exceptions:
Type | Description |
---|---|
ModuleNotFoundError |
If no module exists with name |
LookupError |
If the module doesn't contain a variable 'configuration`. |
TypeError |
If the variable |
Source code in bandsaw/config.py
def get_configuration(configuration_module=None):
"""
Return a configuration.
Args:
configuration_module (str): The module name of a module, which contains the
configuration. The module needs to define a member 'configuration', which
contains an instance of `Configuration`. If no module name is given, a
default configuration is returned based on the value of the
`BANDSAW_CONFIG_MODULE` environment variable. If this variable is not set,
we default to 'bandsaw_config'.
Returns:
bandsaw.config.Configuration: The configuration.
Raises:
ModuleNotFoundError: If no module exists with name `configuration_module`.
LookupError: If the module doesn't contain a variable 'configuration`.
TypeError: If the variable `configuration` is not of type `Configuration`.
"""
if configuration_module is None:
default_configuration_module_name = os.getenv(
CONFIGURATION_MODULE_ENV_VARIABLE,
CONFIGURATION_MODULE_DEFAULT,
)
configuration_module = default_configuration_module_name
if configuration_module not in _configurations:
try:
_load_configuration_module(configuration_module)
except ModuleNotFoundError:
logger.warning(
"No module found for config %s",
configuration_module,
)
raise
return _configurations[configuration_module]
context
🔗
Classes that represent the context used in advising tasks.
Context (SerializableValue)
🔗
Class for representing the context for advising tasks.
The context contains of a set of arbitrary key-value mappings that can be used
by the Advice
classes to store state or communicate with other advices.
Source code in bandsaw/context.py
class Context(SerializableValue):
"""
Class for representing the context for advising tasks.
The context contains of a set of arbitrary key-value mappings that can be used
by the `Advice` classes to store state or communicate with other advices.
"""
def __init__(self, attributes=None):
self._attributes = attributes or {}
def serialized(self):
data = {
'attributes': self._attributes,
}
return data
@classmethod
def deserialize(cls, values):
return Context(values['attributes'])
@property
def attributes(self):
"""
A set of arbitrary key-value mappings for the `Advice` classes.
`Advice` can add to this mapping and use this as a way of keeping state.
"""
return self._attributes
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self._attributes == other._attributes
attributes
property
readonly
🔗
A set of arbitrary key-value mappings for the Advice
classes.
Advice
can add to this mapping and use this as a way of keeping state.
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/context.py
@classmethod
def deserialize(cls, values):
return Context(values['attributes'])
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/context.py
def serialized(self):
data = {
'attributes': self._attributes,
}
return data
decorator
🔗
Contains decorators that allow to define individual tasks
task(*task_args, *, config=None, chain=None, **task_kwargs)
🔗
Decorator that is used to define a function as as task.
The decorator can be used in two different ways, standalone:
Examples:
>>> @task
... def my_task_function():
... pass
or with additional configuration.
Examples:
>>> @task(config='my.config')
... def my_task_function():
... pass
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
str |
The name of the configuration module to use for this task. If not given, the default configuration is used. |
None |
chain |
str |
The name of the advice chain to use for advising this task. If not given, 'default' is used. |
None |
*task_args |
Positional args given to the decorator OR the decorated function.
If the decorator is used WITHOUT providing additional configuration,
|
() |
|
**task_kwargs |
Keyword args given to the decorator.
If the decorator is used WITHOUT providing additional configuration,
|
{} |
Returns:
Type | Description |
---|---|
Callable |
Returns a callable that wraps the decorated function. |
Exceptions:
Type | Description |
---|---|
ModuleNotFoundError |
If the configured configuration module does not exist. |
ValueError |
If the specified advice chain does not exist. |
RuntimeError |
If the task is configured with multiple positional arguments. |
Source code in bandsaw/decorator.py
def task(*task_args, config=None, chain=None, **task_kwargs):
"""
Decorator that is used to define a function as as task.
The decorator can be used in two different ways, standalone:
Example:
>>> @task
... def my_task_function():
... pass
or with additional configuration.
Example:
>>> @task(config='my.config')
... def my_task_function():
... pass
Args:
config (str): The name of the configuration module to use for this task.
If not given, the default configuration is used.
chain (str): The name of the advice chain to use for advising this task.
If not given, 'default' is used.
*task_args: Positional args given to the decorator OR the decorated function.
If the decorator is used WITHOUT providing additional configuration,
`task_args` contains a tuple with a single item that is the function to be
used as a task. If there is additional configuration given, `task_args`
contains the positional arguments of the call of the decorator.
**task_kwargs: Keyword args given to the decorator.
If the decorator is used WITHOUT providing additional configuration,
`task_kwargs` is an empty dictionary. If there is additional configuration
given, `task_kwargs`contains the keyword arguments of the call of the
decorator.
Returns:
Callable: Returns a callable that wraps the decorated function.
Raises:
ModuleNotFoundError: If the configured configuration module does not exist.
ValueError: If the specified advice chain does not exist.
RuntimeError: If the task is configured with multiple positional arguments.
"""
config_module = config
configuration = get_configuration(config_module)
chain_name = chain or 'default'
advice_chain = configuration.get_advice_chain(chain_name)
if advice_chain is None:
raise ValueError(f"Unknown advice chain {chain_name}")
def decorate_function(func):
logger.info("Decorate function '%s'", func)
logger.info("Creating task for function '%s'", func)
the_task = Task.create_task(func, task_kwargs)
def inner(*func_args, **func_kwargs):
execution_id = _calculate_execution_id(
func_args,
func_kwargs,
configuration.serializer,
)
execution = Execution(execution_id, func_args, func_kwargs)
result = advise_task_with_chain(
the_task,
execution,
configuration,
chain_name,
)
if result.exception:
raise result.exception
return result.value
inner.__wrapped__ = func
inner.bandsaw_task = the_task
inner.bandsaw_configuration = configuration
return inner
if len(task_args) == 1 and len(task_kwargs) == 0:
return decorate_function(task_args[0])
if len(task_args) == 0 and (
len(task_kwargs) > 0 or chain is not None or config is not None
):
return decorate_function
# This shouldn't happen if the decorator is properly used.
raise RuntimeError("Invalid 'task' decorator.")
distribution
🔗
Contains functions for creating distribution archives.
Distribution archives are the way how bandsaw transfers code between different machines. They are normal zip files, that contain bandsaw itself, a main module which allows to execute the archive and to continue sessions and possibly some additional dependencies.
DistributionArchive
🔗
Class that represents a distribution archive.
A distribution archive contains all the code necessary for running a task. It can be used for running a task on a different machine by copying over the archive.
Attributes:
Name | Type | Description |
---|---|---|
path |
pathlib.Path |
The path to the file containing the code. |
modules |
tuple[str] |
The names of the modules that are included in this archive. |
Source code in bandsaw/distribution.py
class DistributionArchive:
"""
Class that represents a distribution archive.
A distribution archive contains all the code necessary for running a task. It can
be used for running a task on a different machine by copying over the archive.
Attributes:
path (pathlib.Path): The path to the file containing the code.
modules (tuple[str]): The names of the modules that are included in this
archive.
"""
def __init__(self, path, *modules):
self._path = path
self.modules = modules
@property
def path(self):
"""
Returns:
pathlib.Path: The path to the archive file. The file itself is created
lazily, when the path is accessed the first time. This makes sure,
we only create the archive if necessary.
"""
if not self._path.exists():
_create_distribution_archive(self._path, self.modules)
return self._path
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.modules == other.modules
def __hash__(self):
return hash(self.modules)
path
property
readonly
🔗
Returns:
Type | Description |
---|---|
pathlib.Path |
The path to the archive file. The file itself is created lazily, when the path is accessed the first time. This makes sure, we only create the archive if necessary. |
get_distribution_archive(configuration)
🔗
Returns a distribution archive for a given configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration |
bandsaw.config.Configuration |
The configuration for which the distribution package should be returned. |
required |
Returns:
Type | Description |
---|---|
bandsaw.distribution.DistributionArchive |
The archive for the configuration. |
Source code in bandsaw/distribution.py
def get_distribution_archive(configuration):
"""
Returns a distribution archive for a given configuration.
Args:
configuration (bandsaw.config.Configuration): The configuration for which the
distribution package should be returned.
Returns:
bandsaw.distribution.DistributionArchive: The archive for the configuration.
"""
archive = _CACHE.get_archive(configuration)
if archive is None:
archive_path = pathlib.Path(
tempfile.mktemp(suffix='.pyz', prefix='distribution-')
)
modules = [
'__main__',
'bandsaw',
configuration.module_name,
*configuration.distribution_modules,
]
archive = DistributionArchive(archive_path, *modules)
_CACHE.put_archive(configuration, archive)
return archive
execution
🔗
Contains classes and functions around an execution of a task
Execution (SerializableValue)
🔗
Class that defines an execution of a Task
.
It contains the arguments that should be used for the task and an unique identifier derived from those arguments.
Attributes:
Name | Type | Description |
---|---|---|
execution_id |
str |
A string identifying this execution. |
args |
tuple[Any] |
The positional arguments for the task to use in this execution. |
kwargs |
Dict[Any,Any] |
The keyword arguments for the task to use in this execution. |
Source code in bandsaw/execution.py
class Execution(SerializableValue):
"""
Class that defines an execution of a `Task`.
It contains the arguments that should be used for the task and an unique
identifier derived from those arguments.
Attributes:
execution_id (str): A string identifying this execution.
args (tuple[Any]): The positional arguments for the task to use in this
execution.
kwargs (Dict[Any,Any]): The keyword arguments for the task to use in this
execution.
"""
def __init__(self, execution_id, args=None, kwargs=None):
self.execution_id = execution_id
self.args = args or ()
self.kwargs = kwargs or {}
def serialized(self):
return {
'execution_id': self.execution_id,
'args': self.args,
'kwargs': self.kwargs,
}
@classmethod
def deserialize(cls, values):
return Execution(
values['execution_id'],
values['args'],
values['kwargs'],
)
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/execution.py
@classmethod
def deserialize(cls, values):
return Execution(
values['execution_id'],
values['args'],
values['kwargs'],
)
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/execution.py
def serialized(self):
return {
'execution_id': self.execution_id,
'args': self.args,
'kwargs': self.kwargs,
}
extensions
🔗
Contains an API for extensions that can be used in bandsaw
Extension
🔗
Class that defines the interface of extensions.
An extension can define different callbacks that are called by bandsaw and allows
to extend some existing functionality (e.g. by setting additional values in a
context before it is handled by all advices) or integrate other systems.
Other than Advice
, an Extension
is globally defined in a config and therefore
applies to all tasks.
Source code in bandsaw/extensions.py
class Extension:
"""
Class that defines the interface of extensions.
An extension can define different callbacks that are called by bandsaw and allows
to extend some existing functionality (e.g. by setting additional values in a
context before it is handled by all advices) or integrate other systems.
Other than `Advice`, an `Extension` is globally defined in a config and therefore
applies to all tasks.
"""
def on_init(self, configuration):
"""
Called when a bandsaw configuration has been initialized.
Args:
configuration (bandsaw.config.Configuration): The configuration object
which contains the config that has been loaded.
"""
def on_session_created(self, session):
"""
Called before bandsaw advises a task.
This is called before any advice is applied.
Args:
session (bandsaw.session.Session): The new session.
"""
def on_session_finished(self, session):
"""
Called after bandsaw advised a task.
This is called after all advices have been applied and the final result is
available.
Args:
session (bandsaw.session.Session): The session.
"""
on_init(self, configuration)
🔗
Called when a bandsaw configuration has been initialized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration |
bandsaw.config.Configuration |
The configuration object which contains the config that has been loaded. |
required |
Source code in bandsaw/extensions.py
def on_init(self, configuration):
"""
Called when a bandsaw configuration has been initialized.
Args:
configuration (bandsaw.config.Configuration): The configuration object
which contains the config that has been loaded.
"""
on_session_created(self, session)
🔗
Called before bandsaw advises a task.
This is called before any advice is applied.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The new session. |
required |
Source code in bandsaw/extensions.py
def on_session_created(self, session):
"""
Called before bandsaw advises a task.
This is called before any advice is applied.
Args:
session (bandsaw.session.Session): The new session.
"""
on_session_finished(self, session)
🔗
Called after bandsaw advised a task.
This is called after all advices have been applied and the final result is available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session. |
required |
Source code in bandsaw/extensions.py
def on_session_finished(self, session):
"""
Called after bandsaw advised a task.
This is called after all advices have been applied and the final result is
available.
Args:
session (bandsaw.session.Session): The session.
"""
identifier
🔗
Functions for generating identifier for arbitrary python objects.
identifier_from_bytes(buffer)
🔗
Derive an identifier from a bytebuffer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
buffer |
Union[bytes,bytearray] |
The binary data from which to derive an identifier. |
required |
Returns:
Type | Description |
---|---|
str |
The identifier in form of a string of a hexadecimal number. |
Source code in bandsaw/identifier.py
def identifier_from_bytes(buffer):
"""
Derive an identifier from a bytebuffer.
Args:
buffer (Union[bytes,bytearray]): The binary data from which to derive an
identifier.
Returns:
str: The identifier in form of a string of a hexadecimal number.
"""
identifier = hashlib.sha256(buffer).hexdigest()[:_ID_LENGTH]
return identifier
identifier_from_string(string)
🔗
Derive an identifier from a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string |
str |
The string from which to derive an identifier. |
required |
Returns:
Type | Description |
---|---|
str |
The identifier in form of a string of a hexadecimal number. |
Source code in bandsaw/identifier.py
def identifier_from_string(string):
"""
Derive an identifier from a string.
Args:
string (str): The string from which to derive an identifier.
Returns:
str: The identifier in form of a string of a hexadecimal number.
"""
identifier = identifier_from_bytes(string.encode('utf-8'))
return identifier
infos
🔗
Infos about values, arguments and tasks
value_info(value)
🔗
Information about a value.
The information contains a string representation and a type name, but additional infos can be included as well.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the infos should be returned. |
required |
Returns:
Type | Description |
---|---|
Dict[str,str] |
A dictionary containing the infos about the value. |
Source code in bandsaw/infos.py
def value_info(value):
"""
Information about a value.
The information contains a string representation and a type name, but additional
infos can be included as well.
Args:
value (Any): The value for which the infos should be returned.
Returns:
Dict[str,str]: A dictionary containing the infos about the value.
"""
value_type = type(value)
if isinstance(value, set):
value = sorted(value)
string_value = str(value)
if len(string_value) > 100:
string_value = string_value[:85] + '...' + string_value[-12:]
info = {
'type': value_type.__qualname__,
'value': string_value,
}
if hasattr(value, '__len__'):
info['size'] = str(len(value))
if hasattr(value, 'info'):
info.update(value.info())
return info
interpreter
🔗
Contains classes regarding python interpreters
Interpreter (SerializableValue)
🔗
Class for representing different python interpreters.
This class is used to contain the information about specific python interpreters that are used within the library. In order to support multiple different interpreters there will be the option to define the interpreter as part of config. Currently only a single interpreter is automatically defined.
Source code in bandsaw/interpreter.py
class Interpreter(SerializableValue):
"""
Class for representing different python interpreters.
This class is used to contain the information about specific python interpreters
that are used within the library. In order to support multiple different
interpreters there will be the option to define the interpreter as part of config.
Currently only a single interpreter is automatically defined.
"""
def __init__(
self,
path=None,
executable=None,
):
"""
Create a new interpreter instance.
Args:
path (List[str]): A list of directory paths, to be used as $PYTHONPATH.
If `None` the current `sys.path` is used.
executable (str): The path to the python executable for this interpreter.
If `None` the current `sys.executable` is used.
"""
if path is None:
self._path = tuple(sys.path)
else:
self._path = tuple(path)
self.executable = executable or sys.executable
self._environment = {}
def set_environment(self, **environment):
"""
Set the environment variables to use for this interpreter.
A call to this methods overwrites all variables that have been set previously.
Args:
**environment: Arbitrary keyword arguments where the name of the keyword
corresponds to the name of the environment variable and the values
will be the values set in the environment.
"""
self._environment = environment
return self
@property
def environment(self):
"""The environment variables to be set for the interpreter."""
return dict(self._environment)
@property
def path(self):
"""The python path items that will be used."""
return tuple(self._path)
def serialized(self):
return {
'path': self._path,
'executable': self.executable,
'environment': self._environment,
}
@classmethod
def deserialize(cls, values):
return Interpreter(
path=values['path'],
executable=values['executable'],
).set_environment(**values['environment'])
environment
property
readonly
🔗
The environment variables to be set for the interpreter.
path
property
readonly
🔗
The python path items that will be used.
__init__(self, path=None, executable=None)
special
🔗
Create a new interpreter instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
List[str] |
A list of directory paths, to be used as $PYTHONPATH.
If |
None |
executable |
str |
The path to the python executable for this interpreter.
If |
None |
Source code in bandsaw/interpreter.py
def __init__(
self,
path=None,
executable=None,
):
"""
Create a new interpreter instance.
Args:
path (List[str]): A list of directory paths, to be used as $PYTHONPATH.
If `None` the current `sys.path` is used.
executable (str): The path to the python executable for this interpreter.
If `None` the current `sys.executable` is used.
"""
if path is None:
self._path = tuple(sys.path)
else:
self._path = tuple(path)
self.executable = executable or sys.executable
self._environment = {}
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/interpreter.py
@classmethod
def deserialize(cls, values):
return Interpreter(
path=values['path'],
executable=values['executable'],
).set_environment(**values['environment'])
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/interpreter.py
def serialized(self):
return {
'path': self._path,
'executable': self.executable,
'environment': self._environment,
}
set_environment(self, **environment)
🔗
Set the environment variables to use for this interpreter.
A call to this methods overwrites all variables that have been set previously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**environment |
Arbitrary keyword arguments where the name of the keyword corresponds to the name of the environment variable and the values will be the values set in the environment. |
{} |
Source code in bandsaw/interpreter.py
def set_environment(self, **environment):
"""
Set the environment variables to use for this interpreter.
A call to this methods overwrites all variables that have been set previously.
Args:
**environment: Arbitrary keyword arguments where the name of the keyword
corresponds to the name of the environment variable and the values
will be the values set in the environment.
"""
self._environment = environment
return self
io
🔗
Contains classes related to input/output of data.
BytearrayGeneratorToStream (RawIOBase)
🔗
Stream that reads from a generator that yields bytes/bytearrays.
Source code in bandsaw/io.py
class BytearrayGeneratorToStream(io.RawIOBase):
"""
Stream that reads from a generator that yields bytes/bytearrays.
"""
def __init__(self, generator):
self._generator = generator
self._source = None
self._source_position = 0
super().__init__()
def readinto(self, buffer):
buffer_size = len(buffer)
position = 0
generator_empty = False
while True:
if self._source is None:
self._source = next(self._generator, None)
self._source_position = 0
if self._source is None:
logger.debug("no new bytearray from generator, ending stream")
generator_empty = True
else:
logger.debug(
"got new bytearray from generator of size %d",
len(self._source),
)
if self._source is not None:
source_bytes_left = len(self._source) - self._source_position
buffer_free = buffer_size - position
bytes_to_copy = min(buffer_free, source_bytes_left)
logger.debug(
"copy %d bytes from current value to stream via buffer of size %d",
bytes_to_copy,
buffer_size,
)
buffer_end = position + bytes_to_copy
source_end = self._source_position + bytes_to_copy
buffer[position:buffer_end] = self._source[
self._source_position : source_end
]
position += bytes_to_copy
if bytes_to_copy < source_bytes_left:
self._source_position += bytes_to_copy
else:
self._source = None
buffer_full = position == buffer_size
if buffer_full or generator_empty:
break
return position
read_stream_to_generator(stream, buffer_size=8192)
🔗
Read from a stream into a generator yielding bytes
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The stream to read bytes from. |
required |
buffer_size |
int |
The buffer size to read. Each individual |
8192 |
Yields:
Type | Description |
---|---|
bytes |
A byte buffer of maximum size of |
Source code in bandsaw/io.py
def read_stream_to_generator(stream, buffer_size=8192):
"""
Read from a stream into a generator yielding `bytes`.
Args:
stream (io.Stream): The stream to read bytes from.
buffer_size (int): The buffer size to read. Each individual `bytes` buffer
returned by the generator has a maximum size of `buffer_size`. Defaults
to `8192` if not set.
Yields:
bytes: A byte buffer of maximum size of `buffer_size`.
"""
buffer = bytearray(buffer_size)
while True:
read = stream.readinto(buffer)
if read:
yield buffer[:read]
else:
break
modules
🔗
Utility functions for handling python modules.
get_loaded_module_name_by_path(file_path)
🔗
Determine the name of an already loaded module by its file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
File path of the python file containing the module. |
required |
Returns:
Type | Description |
---|---|
str |
The name of the module, or |
Source code in bandsaw/modules.py
def get_loaded_module_name_by_path(file_path):
"""
Determine the name of an already loaded module by its file path.
Args:
file_path (str): File path of the python file containing the module.
Returns:
str: The name of the module, or `None` if the file isn't loaded as a module.
"""
real_path = os.path.realpath(file_path)
for name, module in sys.modules.items():
if hasattr(module, '__file__'):
module_path = os.path.realpath(module.__file__)
if module_path == real_path:
return name
return None
import_object(object_name, module_name)
🔗
Import a python object from a module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object_name |
str |
The name under which the object is defined in the module. |
required |
module_name |
str |
The name of the module from which the object should be imported. |
required |
Returns:
Type | Description |
---|---|
object |
The python object defined under the name. |
Exceptions:
Type | Description |
---|---|
AttributeError |
If nothing is defined with name |
ModuleNotFoundError |
If no module exists with the name |
Source code in bandsaw/modules.py
def import_object(object_name, module_name):
"""
Import a python object from a module.
Args:
object_name (str): The name under which the object is defined in the module.
module_name (str): The name of the module from which the object should be
imported.
Returns:
object: The python object defined under the name.
Raises:
AttributeError: If nothing is defined with name `object_name` in the
referenced module.
ModuleNotFoundError: If no module exists with the name `module_name`.
"""
module = importlib.import_module(module_name)
return getattr(module, object_name)
object_as_import(obj)
🔗
Returns the name and module of an object, which can be used for importing it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
object |
An arbitrary Python object. |
required |
Returns:
Type | Description |
---|---|
Tuple(str, str) |
Returns a tuple of the object name and the module name in which the object is defined. |
Exceptions:
Type | Description |
---|---|
ValueError |
If |
Note
If obj
is defined within the __main__
script, the function tries to
determine a name for the __main__
module, under which it could be imported
from other scripts.
Source code in bandsaw/modules.py
def object_as_import(obj):
"""
Returns the name and module of an object, which can be used for importing it.
Args:
obj (object): An arbitrary Python object.
Returns:
Tuple(str, str): Returns a tuple of the object name and the module name in
which the object is defined.
Raises:
ValueError: If `obj` doesn't have a name that can be directly imported, e.g.
because it is defined within a local class.
Note:
If `obj` is defined within the `__main__` script, the function tries to
determine a name for the `__main__` module, under which it could be imported
from other scripts.
"""
object_name = obj.__name__
module_name = obj.__module__
if module_name == '__main__':
module_file_path = sys.modules['__main__'].__file__
module_name = _guess_module_name_by_path(module_file_path, sys.path)
if '<locals>' in obj.__qualname__:
raise ValueError("Can't import local functions.")
return object_name, module_name
result
🔗
Contains code for representing the result of tasks.
Result (SerializableValue)
🔗
Class to encapsulate the result of a task execution.
Attributes:
Name | Type | Description |
---|---|---|
value |
Any |
The value that is returned by the task. |
exception |
Exception |
The exception that was raised during execution, |
Source code in bandsaw/result.py
class Result(SerializableValue):
"""
Class to encapsulate the result of a task execution.
Attributes:
value (Any): The value that is returned by the task. `None` is the task raised
an exception during execution.
exception (Exception): The exception that was raised during execution, `None`
if no exception was raised.
"""
def __init__(self, value=None, exception=None):
self.value = value
self.exception = exception
def serialized(self):
values = {
"value": self.value,
"exception": self.exception,
}
return values
@classmethod
def deserialize(cls, values):
value = values["value"]
exception = values["exception"]
return Result(value=value, exception=exception)
def __eq__(self, other):
value_equals = self.value == other.value
exception_type_equals = isinstance(other.exception, type(self.exception))
exception_args_equals = getattr(self.exception, "args", None) == getattr(
other.exception, "args", None
)
return value_equals and exception_type_equals and exception_args_equals
def __hash__(self):
return hash((self.value, repr(self.exception)))
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/result.py
@classmethod
def deserialize(cls, values):
value = values["value"]
exception = values["exception"]
return Result(value=value, exception=exception)
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/result.py
def serialized(self):
values = {
"value": self.value,
"exception": self.exception,
}
return values
run
🔗
Functions for managing the run id.
get_run_id()
🔗
Returns the run id.
The run id is a unique identifier that is specific to an individual run of a workflow. It stays the same across all task executions and can be used for tracking metrics and differentiating between different runs of the same workflow where task_id and run_id stay the same.
Returns:
Type | Description |
---|---|
str |
The unique run id. |
Source code in bandsaw/run.py
def get_run_id():
"""
Returns the run id.
The run id is a unique identifier that is specific to an individual run of a
workflow. It stays the same across all task executions and can be used for
tracking metrics and differentiating between different runs of the same workflow
where task_id and run_id stay the same.
Returns:
str: The unique run id.
"""
if _RUN_ID is None:
set_run_id(str(uuid.uuid1()))
return _RUN_ID
set_run_id(run_id)
🔗
Sets the run id.
Setting the run id explicitly is usually not necessary. The function is mainly used when task executions are run in a different process to make sure the run id is consistent with the spawning process, but it can be used e.g. if an external system provides a unique identifier for a specific workflow run.
When set_run_id(run_id)
is being used, it must be run before the first tasks
are actually defined.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the run id was already set before. |
Source code in bandsaw/run.py
def set_run_id(run_id):
"""
Sets the run id.
Setting the run id explicitly is usually not necessary. The function is mainly
used when task executions are run in a different process to make sure the run id
is consistent with the spawning process, but it can be used e.g. if an external
system provides a unique identifier for a specific workflow run.
When `set_run_id(run_id)` is being used, it must be run before the first tasks
are actually defined.
Raises:
RuntimeError: If the run id was already set before.
"""
global _RUN_ID # pylint: disable=global-statement
if _RUN_ID is not None:
logger.error("run_id already set to %s when trying to set again", _RUN_ID)
raise RuntimeError("Run ID was already set")
logger.info("Set run_id to %s", run_id)
_RUN_ID = run_id
runner
🔗
Contains main() function to continue sessions from files
main(args)
🔗
Main function that can be used for proceeding a session.
This function allows to read a session from a file, proceed it until it returns and then save the state of the session to a new file. It is used for running tasks in a separate process or on different machines.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
tuple[str] |
The arguments taken from the command line. |
required |
Source code in bandsaw/runner.py
def main(args):
"""
Main function that can be used for proceeding a session.
This function allows to read a session from a file, proceed it until it returns
and then save the state of the session to a new file. It is used for running
tasks in a separate process or on different machines.
Args:
args (tuple[str]): The arguments taken from the command line.
"""
hostname = os.uname()[1]
log_format = (
f"{{asctime}} {hostname} {{process: >5d}} {{thread: >5d}} "
f"{{name}} {{levelname}}: {{message}}"
)
logging.basicConfig(level=logging.INFO, format=log_format, style='{')
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input_session',
help="The session which should be continued",
required=True,
)
parser.add_argument(
'--output',
dest='output_session',
help="The session after continuation ended",
required=True,
)
parser.add_argument(
'--run-id',
dest='run_id',
help="The run id of the workflow",
required=True,
)
args = parser.parse_args(args=args)
set_run_id(args.run_id)
logger.info("Creating new session")
session = Session()
logger.info("Reading session from %s", args.output_session)
with io.FileIO(args.input_session, mode='r') as stream:
session.restore(stream)
logger.info("Proceeding session")
session.proceed()
logger.info("Writing session with result to %s", args.output_session)
with io.FileIO(args.output_session, mode='w') as stream:
session.save(stream)
serialization
special
🔗
Module for the package bandsaw.serialization. Contains all public types.
json
🔗
Contains a Serializer that allows to serialize objects to JSON.
JsonSerializer (Serializer)
🔗
A Serializer
which serializes objects to JSON.
Attributes:
Name | Type | Description |
---|---|---|
value_serializers |
List[ValueSerializer] |
A list of serializers that are used for serialization of custom types. |
Source code in bandsaw/serialization/json.py
class JsonSerializer(Serializer):
"""
A `Serializer` which serializes objects to JSON.
Attributes:
value_serializers (List[ValueSerializer]): A list of serializers that are used
for serialization of custom types.
"""
def __init__(self):
super().__init__()
self.value_serializers = []
self.value_serializers.append(ExceptionSerializer())
self.value_serializers.append(SerializableValueSerializer())
self.value_serializers.append(TupleSerializer())
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
json.dump(
value,
text_stream,
cls=_ExtensibleJSONEncoder,
value_serializers=self.value_serializers,
allow_nan=False,
indent=None,
separators=(',', ':'),
sort_keys=True,
)
text_stream.detach()
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
result = json.load(
text_stream,
cls=_ExtensibleJSONDecoder,
value_serializers=self.value_serializers,
)
text_stream.detach()
return result
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/json.py
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
result = json.load(
text_stream,
cls=_ExtensibleJSONDecoder,
value_serializers=self.value_serializers,
)
text_stream.detach()
return result
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/json.py
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
json.dump(
value,
text_stream,
cls=_ExtensibleJSONEncoder,
value_serializers=self.value_serializers,
allow_nan=False,
indent=None,
separators=(',', ':'),
sort_keys=True,
)
text_stream.detach()
pickle
🔗
Contains a Serializer which uses pickle for serializing values.
PickleSerializer (Serializer)
🔗
A Serializer
which serializes objects using pickle.
Source code in bandsaw/serialization/pickle.py
class PickleSerializer(Serializer):
"""A `Serializer` which serializes objects using pickle."""
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
pickle.dump(value, stream)
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
return pickle.load(stream)
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/pickle.py
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
return pickle.load(stream)
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/pickle.py
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
pickle.dump(value, stream)
serializer
🔗
Base classes for serializers which allow to serialize python values.
Serializer (ABC)
🔗
Interface for Serializer
which serialize objects
Source code in bandsaw/serialization/serializer.py
class Serializer(abc.ABC):
"""Interface for `Serializer` which serialize objects"""
@abc.abstractmethod
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
@abc.abstractmethod
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
values
🔗
A collection of classes for serializing custom objects.
ExceptionSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing exceptions.
The serializer saves only the type and the args
attribute of the exception,
therefore it won't work for all exception types, but it should cover the most.
Other attributes of the exception, e.g. stacktrace etc. are discarded.
Source code in bandsaw/serialization/values.py
class ExceptionSerializer(ValueSerializer):
"""
A ValueSerializer for serializing exceptions.
The serializer saves only the type and the `args` attribute of the exception,
therefore it won't work for all exception types, but it should cover the most.
Other attributes of the exception, e.g. stacktrace etc. are discarded.
"""
def can_serialize_value(self, value):
return isinstance(value, Exception)
def serialize_value(self, value):
state = {
'type': type(value).__name__,
'module': type(value).__module__,
'args': value.args,
}
return state
def deserialize_value(self, representation):
module_name = representation['module']
type_name = representation['type']
module = importlib.import_module(module_name)
value_type = getattr(module, type_name)
return value_type(*representation['args'])
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, Exception)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
module_name = representation['module']
type_name = representation['type']
module = importlib.import_module(module_name)
value_type = getattr(module, type_name)
return value_type(*representation['args'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
state = {
'type': type(value).__name__,
'module': type(value).__module__,
'args': value.args,
}
return state
SerializableValue (ABC)
🔗
Interface for types that can serialize themselves.
Source code in bandsaw/serialization/values.py
class SerializableValue(abc.ABC):
"""Interface for types that can serialize themselves."""
@abc.abstractmethod
def serialized(self):
"""Returns a serializable representation of the value."""
@classmethod
@abc.abstractmethod
def deserialize(cls, values):
"""Returns a new instance of a value from its serialized representation."""
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/serialization/values.py
@classmethod
@abc.abstractmethod
def deserialize(cls, values):
"""Returns a new instance of a value from its serialized representation."""
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialized(self):
"""Returns a serializable representation of the value."""
SerializableValueSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing subclasses of SerializableValue
.
The serializer uses the methods defined in SerializableValue
and implemented
by the individual classes to serialize values. It stores the type of the value
and its serialized representation and allows to recreate the value from this
information.
Source code in bandsaw/serialization/values.py
class SerializableValueSerializer(ValueSerializer):
"""
A ValueSerializer for serializing subclasses of `SerializableValue`.
The serializer uses the methods defined in `SerializableValue` and implemented
by the individual classes to serialize values. It stores the type of the value
and its serialized representation and allows to recreate the value from this
information.
"""
def can_serialize_value(self, value):
return isinstance(value, SerializableValue)
def serialize_value(self, value):
state = {
'type': type(value).__name__,
'module': type(value).__module__,
'serialized': value.serialized(),
}
return state
def deserialize_value(self, representation):
module_name = representation['module']
type_name = representation['type']
module = importlib.import_module(module_name)
value_type = getattr(module, type_name)
return value_type.deserialize(representation['serialized'])
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, SerializableValue)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
module_name = representation['module']
type_name = representation['type']
module = importlib.import_module(module_name)
value_type = getattr(module, type_name)
return value_type.deserialize(representation['serialized'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
state = {
'type': type(value).__name__,
'module': type(value).__module__,
'serialized': value.serialized(),
}
return state
TupleSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing tuples.
The serializer supports normal tuples as well as named tuples. When namedtuples are deserialized it first tries to reuse an existing namedtople type. If the type can't be imported or reused, a new namedtuple type with the same name and fields is created on the fly.
Source code in bandsaw/serialization/values.py
class TupleSerializer(ValueSerializer):
"""
A ValueSerializer for serializing tuples.
The serializer supports normal tuples as well as named tuples. When namedtuples
are deserialized it first tries to reuse an existing namedtople type. If the type
can't be imported or reused, a new namedtuple type with the same name and fields
is created on the fly.
"""
def can_serialize_value(self, value):
return isinstance(value, tuple)
def serialize_value(self, value):
if hasattr(value, '_fields'):
state = {
'type': 'namedtuple',
'fields': list(value._fields),
'name': type(value).__name__,
'module': type(value).__module__,
'items': list(value),
}
else:
state = {
'type': 'tuple',
'items': list(value),
}
return state
def deserialize_value(self, representation):
if representation['type'] == 'namedtuple':
# try to import the namedtuple type
module_name = representation['module']
type_name = representation['name']
try:
module = importlib.import_module(module_name)
tuple_type = getattr(module, type_name)
except (ImportError, AttributeError) as error:
logger.warning(
"Error importing namedtuple, trying to recreate it: %s", error
)
# Recreate a new type
field_names = ' '.join(representation['fields'])
tuple_type = collections.namedtuple(
type_name, field_names, module=module_name
)
return tuple_type(*representation['items'])
return tuple(representation['items'])
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, tuple)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
if representation['type'] == 'namedtuple':
# try to import the namedtuple type
module_name = representation['module']
type_name = representation['name']
try:
module = importlib.import_module(module_name)
tuple_type = getattr(module, type_name)
except (ImportError, AttributeError) as error:
logger.warning(
"Error importing namedtuple, trying to recreate it: %s", error
)
# Recreate a new type
field_names = ' '.join(representation['fields'])
tuple_type = collections.namedtuple(
type_name, field_names, module=module_name
)
return tuple_type(*representation['items'])
return tuple(representation['items'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
if hasattr(value, '_fields'):
state = {
'type': 'namedtuple',
'fields': list(value._fields),
'name': type(value).__name__,
'module': type(value).__module__,
'items': list(value),
}
else:
state = {
'type': 'tuple',
'items': list(value),
}
return state
ValueSerializer (ABC)
🔗
Interface for serializers that can serialize custom values.
Source code in bandsaw/serialization/values.py
class ValueSerializer(abc.ABC):
"""
Interface for serializers that can serialize custom values.
"""
@abc.abstractmethod
def can_serialize_value(self, value):
"""
Returns if a serializer can serialize a specific value.
Args:
value (Any): The value that should be serialized.
Returns:
boolean: `True` if this serializer can serialize the given value,
otherwise `False`.
"""
@abc.abstractmethod
def serialize_value(self, value):
"""
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive
values, lists or dicts.
Args:
value (Any): The value that should be serialized.
Returns:
Any: The serialized representation of the value.
"""
@abc.abstractmethod
def deserialize_value(self, representation):
"""
Returns a deserialized value from its serialized representation.
Args:
representation (Any): The serialized representation of the value.
Returns:
Any: The deserialized value.
"""
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def can_serialize_value(self, value):
"""
Returns if a serializer can serialize a specific value.
Args:
value (Any): The value that should be serialized.
Returns:
boolean: `True` if this serializer can serialize the given value,
otherwise `False`.
"""
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def deserialize_value(self, representation):
"""
Returns a deserialized value from its serialized representation.
Args:
representation (Any): The serialized representation of the value.
Returns:
Any: The deserialized value.
"""
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialize_value(self, value):
"""
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive
values, lists or dicts.
Args:
value (Any): The value that should be serialized.
Returns:
Any: The serialized representation of the value.
"""
session
🔗
Contains classes for representing an advising session
Attachment (ABC)
🔗
Class that represents a single file that as been attached to a session.
Source code in bandsaw/session.py
class Attachment(abc.ABC):
"""
Class that represents a single file that as been attached to a session.
"""
@abc.abstractmethod
def open(self):
"""
Opens the attachment for reading.
Returns:
io.RawIOBase: binary stream for reading.
"""
@property
@abc.abstractmethod
def size(self):
"""Return the size of the attachment in bytes"""
size
property
readonly
🔗
Return the size of the attachment in bytes
open(self)
🔗
Opens the attachment for reading.
Returns:
Type | Description |
---|---|
io.RawIOBase |
binary stream for reading. |
Source code in bandsaw/session.py
@abc.abstractmethod
def open(self):
"""
Opens the attachment for reading.
Returns:
io.RawIOBase: binary stream for reading.
"""
Attachments (Mapping)
🔗
A mapping that contains attachments.
Attachments can only be added, but neither deleted nor overwritten. Their names must be valid file names without directories.
Attachments itself is a mapping class and can be used similar to a dictionary.
When a new attachments is added, it must be path to an existing file, either as
str
or pathlib.Path
. When an attachment is accessed, an object of type
Attachment
is returned, that gives access to the size of the attachment and
allows to read its content.
Examples:
>>> attachments = Attachments()
>>> attachments['my.attachment'] = '/path/to/file'
>>> attachment = attachments['my.attachment']
>>> attachment.size
1234
>>> attachment.open().readall()
b'My binary file content.'
Source code in bandsaw/session.py
class Attachments(collections.abc.Mapping):
"""
A mapping that contains attachments.
Attachments can only be added, but neither deleted nor overwritten. Their names
must be valid file names without directories.
Attachments itself is a mapping class and can be used similar to a dictionary.
When a new attachments is added, it must be path to an existing file, either as
`str` or `pathlib.Path`. When an attachment is accessed, an object of type
`Attachment` is returned, that gives access to the size of the attachment and
allows to read its content.
Examples:
>>> attachments = Attachments()
>>> attachments['my.attachment'] = '/path/to/file'
>>> attachment = attachments['my.attachment']
>>> attachment.size
1234
>>> attachment.open().readall()
b'My binary file content.'
"""
def __init__(self, zip_file=None):
"""
Creates a new container for attachments.
Args:
zip_file (zipfile.ZipFile): An already existing zip file, which can be
used for initializing with pre-existing attachments.
"""
self._items = {}
if zip_file is not None:
self._add_attachments_from_zip(zip_file)
def _add_attachments_from_zip(self, zip_file):
for file_path in zip_file.namelist():
if file_path[:12] == 'attachments/':
attachment_name = file_path.split('/', 1)[1]
self._items[attachment_name] = _ZipAttachment(zip_file, file_path)
def store(self, zip_file):
"""
Stores all attachments in a zip file.
Args:
zip_file (zipfile.ZipFile): The zip file where the attachments should be
stored in.
"""
for name, attachment in self._items.items():
with attachment.open() as stream:
zip_file.writestr('attachments/' + name, stream.read())
def __setitem__(self, key, path):
if key in self._items:
raise KeyError(f"Attachment '{key}' does already exist")
if isinstance(path, str):
path = pathlib.Path(path)
if not isinstance(path, pathlib.Path):
raise TypeError("Invalid type for value, must be str or Path")
if not path.exists():
raise ValueError("File does not exist")
if not path.is_file():
raise ValueError("Path is not a file")
self._items[key] = _FileAttachment(path)
def __getitem__(self, key):
return self._items[key]
def __iter__(self):
return iter(self._items)
def __len__(self):
return len(self._items)
__init__(self, zip_file=None)
special
🔗
Creates a new container for attachments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
zip_file |
zipfile.ZipFile |
An already existing zip file, which can be used for initializing with pre-existing attachments. |
None |
Source code in bandsaw/session.py
def __init__(self, zip_file=None):
"""
Creates a new container for attachments.
Args:
zip_file (zipfile.ZipFile): An already existing zip file, which can be
used for initializing with pre-existing attachments.
"""
self._items = {}
if zip_file is not None:
self._add_attachments_from_zip(zip_file)
store(self, zip_file)
🔗
Stores all attachments in a zip file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
zip_file |
zipfile.ZipFile |
The zip file where the attachments should be stored in. |
required |
Source code in bandsaw/session.py
def store(self, zip_file):
"""
Stores all attachments in a zip file.
Args:
zip_file (zipfile.ZipFile): The zip file where the attachments should be
stored in.
"""
for name, attachment in self._items.items():
with attachment.open() as stream:
zip_file.writestr('attachments/' + name, stream.read())
Ids
🔗
Class that encapsulates the ids of a session.
Attributes:
Name | Type | Description |
---|---|---|
task_id |
str |
The id of the task in this session. |
execution_id |
str |
The id of the execution of the task in this session. |
run_id |
str |
The id of the current run. |
session_id |
str |
The id of the session. The id is a combination of the three other ids. |
Source code in bandsaw/session.py
class Ids:
"""
Class that encapsulates the ids of a session.
Attributes:
task_id (str): The id of the task in this session.
execution_id (str): The id of the execution of the task in this session.
run_id (str): The id of the current run.
session_id (str): The id of the session. The id is a combination of the three
other ids.
"""
slots = ('task_id', 'execution_id', 'run_id', '_session_id')
def __init__(self, task_id, execution_id, run_id):
self.task_id = task_id
self.execution_id = execution_id
self.run_id = run_id
self.session_id = "_".join(
[self.task_id, self.execution_id, self.run_id],
)
def __str__(self):
return self.session_id
def as_path(self):
"""
Returns a relative path derived from the ids.
Returns:
pathlib.Path: relative path that uses the individual ids as components.
"""
return pathlib.Path(self.task_id) / self.execution_id / self.run_id
@classmethod
def from_string(cls, id_as_string):
"""Create new Ids object from its string representation."""
return Ids(*(id_as_string.split('_')))
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.session_id == other.session_id
def __hash__(self):
return hash(self.session_id)
as_path(self)
🔗
Returns a relative path derived from the ids.
Returns:
Type | Description |
---|---|
pathlib.Path |
relative path that uses the individual ids as components. |
Source code in bandsaw/session.py
def as_path(self):
"""
Returns a relative path derived from the ids.
Returns:
pathlib.Path: relative path that uses the individual ids as components.
"""
return pathlib.Path(self.task_id) / self.execution_id / self.run_id
from_string(id_as_string)
classmethod
🔗
Create new Ids object from its string representation.
Source code in bandsaw/session.py
@classmethod
def from_string(cls, id_as_string):
"""Create new Ids object from its string representation."""
return Ids(*(id_as_string.split('_')))
Session
🔗
Class that handles the advising of an execution.
A Session
object is given to the individual advices that are called to advise
the execution. By calling the appropriate methods like def proceed(self)
to
continue or conclude()
to end with a result, the advices can influence the final
result.
Additionally, the session provides access to the context
, which allows advices
to keep state, the execution
that is advised, the configuration
that is used
for advising and the result
of the execution.
Attributes:
Name | Type | Description |
---|---|---|
task |
bandsaw.tasks.Task |
The task that is executed. |
execution |
bandsaw.execution.Execution |
The execution arguments for the task. |
context |
bandsaw.context.Context |
The context that can be used for advices to store state. |
result |
bandsaw.result.Result |
Result of the task if already computed.
Otherwise, |
attachments |
bandsaw.session.Attachments |
A mapping of files that have been attached to the session. |
configuration |
bandsaw.config.Configuration |
The configuration that is being used for advising this task. |
Source code in bandsaw/session.py
class Session:
"""
Class that handles the advising of an execution.
A `Session` object is given to the individual advices that are called to advise
the execution. By calling the appropriate methods like `def proceed(self)` to
continue or `conclude()` to end with a result, the advices can influence the final
result.
Additionally, the session provides access to the `context`, which allows advices
to keep state, the `execution` that is advised, the `configuration` that is used
for advising and the `result` of the execution.
Attributes:
task (bandsaw.tasks.Task): The task that is executed.
execution (bandsaw.execution.Execution): The execution arguments for the task.
context (bandsaw.context.Context): The context that can be used for advices
to store state.
result (bandsaw.result.Result): Result of the task if already computed.
Otherwise, `None`.
attachments (bandsaw.session.Attachments): A mapping of files that have been
attached to the session.
configuration (bandsaw.config.Configuration): The configuration that is being
used for advising this task.
"""
# pylint: disable=too-many-instance-attributes
# is reasonable in this case.
def __init__(
self,
task=None,
execution=None,
configuration=None,
advice_chain='default',
):
"""
Create a new session.
"""
self.task = task
self.execution = execution
self.context = {}
self.result = None
self.attachments = Attachments()
self.configuration = configuration
self._advice_chain = advice_chain
self._moderator = None
self._ids = None
self._temp_dir = None
def initiate(self):
"""
Start the process of advising an execution.
Returns:
bandsaw.result.Result: The final result of the execution after all
advices.
"""
self._moderator = _Moderator(
self.configuration.get_advice_chain(self._advice_chain)
)
logger.debug("running extensions before advice")
for extension in self.configuration.extensions:
extension.on_session_created(self)
self.proceed()
if not self._moderator.is_finished:
raise RuntimeError(
f"Not all advice has been applied. "
f"Misbehaving advice {self._moderator.current_advice}"
)
logger.debug("running extensions after advice")
for extension in self.configuration.extensions:
extension.on_session_finished(self)
return self.result
@property
def ids(self):
"""The ids of this session."""
if self._ids is None:
if self.task is None or self.execution is None:
raise ValueError("Incomplete session, missing task or execution.")
self._ids = Ids(
self.task.task_id,
self.execution.execution_id,
self.run_id,
)
return self._ids
@property
def session_id(self):
"""The id of this session as string."""
return str(self.ids)
@property
def serializer(self):
"""The serializer that can be used for serializing values."""
return self.configuration.serializer
@property
def distribution_archive(self):
"""The DistributionArchive which can be used when transferring the session."""
return get_distribution_archive(self.configuration)
@property
def run_id(self):
"""The run id of the workflow."""
return get_run_id()
@property
def temp_dir(self):
"""
Temporary directory where session specific files can be written to.
This directory is meant for storing temporary files, that are used by the
individual `Advice` instances. The directory is already created and will be
automatically deleted with the end of the python interpreter, nonetheless,
the advices writing files to the directory should if possible take care of
removing them if no longer needed.
Returns:
pathlib.Path: Path to the temporary directory.
"""
if self._temp_dir is None:
self._temp_dir = self.configuration.temporary_directory / self.ids.as_path()
self._temp_dir.mkdir(parents=True, exist_ok=True)
return self._temp_dir
def proceed(self):
"""
Continue the process of advising with the next advice.
"""
self._moderator.next(self)
def conclude(self, result):
"""
Conclude the process of advising with a `Result`.
This can be used in two cases:
1. Concluding BEFORE the task was actually executed. This will skip all
subsequent advices defined later in the advice chain and will skip the
task execution. The given `result` will then be used as preliminary result.
All advices that are defined before the calling advice in the advice chain
will still be called with there `after(session)` method.
2. Concluding AFTER the task was actually executed. This will just change the
`result` of the session and continue will all following advices.
Args:
result (bandsaw.result.Result): The result to conclude with.
"""
self.result = result
self._moderator.skip(self)
def save(self, stream):
"""
Suspend the session to be resumed later or elsewhere.
"""
self._store_as_zip(stream)
def restore(self, stream):
"""
Resume a prior suspended session.
"""
self._load_from_zip(stream)
return self
def _load_from_zip(self, stream):
# We don't use with here, because we don't want to close the zip file
# This allows the attachment's container, to access the attachments from the
# archive
archive = zipfile.ZipFile(stream, 'r') # pylint: disable=consider-using-with
session_json = json.loads(archive.read('session.json'))
self.configuration = get_configuration(session_json['configuration'])
self._advice_chain = session_json['advice_chain']
self._ids = Ids.from_string(session_json['ids'])
serializer = self.configuration.serializer
stream = io.BytesIO(archive.read('task.dat'))
self.task = serializer.deserialize(stream)
stream = io.BytesIO(archive.read('execution.dat'))
self.execution = serializer.deserialize(stream)
stream = io.BytesIO(archive.read('context.dat'))
self.context = serializer.deserialize(stream)
stream = io.BytesIO(archive.read('result.dat'))
self.result = serializer.deserialize(stream)
stream = io.BytesIO(archive.read('moderator.dat'))
self._moderator = serializer.deserialize(stream)
if self._moderator is not None:
self._moderator.advice_chain = self.configuration.get_advice_chain(
self._advice_chain
)
self.attachments = Attachments(archive)
def _store_as_zip(self, stream):
serializer = self.configuration.serializer
with zipfile.ZipFile(stream, 'w') as archive:
session_json = json.dumps(
{
'configuration': self.configuration.module_name,
'advice_chain': self._advice_chain,
'ids': str(self.ids),
}
)
archive.writestr('session.json', session_json)
stream = io.BytesIO()
serializer.serialize(self.task, stream)
archive.writestr('task.dat', stream.getvalue())
stream = io.BytesIO()
serializer.serialize(self.execution, stream)
archive.writestr('execution.dat', stream.getvalue())
stream = io.BytesIO()
serializer.serialize(self.context, stream)
archive.writestr('context.dat', stream.getvalue())
stream = io.BytesIO()
serializer.serialize(self.result, stream)
archive.writestr('result.dat', stream.getvalue())
stream = io.BytesIO()
serializer.serialize(self._moderator, stream)
archive.writestr('moderator.dat', stream.getvalue())
self.attachments.store(archive)
distribution_archive
property
readonly
🔗
The DistributionArchive which can be used when transferring the session.
ids
property
readonly
🔗
The ids of this session.
run_id
property
readonly
🔗
The run id of the workflow.
serializer
property
readonly
🔗
The serializer that can be used for serializing values.
session_id
property
readonly
🔗
The id of this session as string.
temp_dir
property
readonly
🔗
Temporary directory where session specific files can be written to.
This directory is meant for storing temporary files, that are used by the
individual Advice
instances. The directory is already created and will be
automatically deleted with the end of the python interpreter, nonetheless,
the advices writing files to the directory should if possible take care of
removing them if no longer needed.
Returns:
Type | Description |
---|---|
pathlib.Path |
Path to the temporary directory. |
__init__(self, task=None, execution=None, configuration=None, advice_chain='default')
special
🔗
Create a new session.
Source code in bandsaw/session.py
def __init__(
self,
task=None,
execution=None,
configuration=None,
advice_chain='default',
):
"""
Create a new session.
"""
self.task = task
self.execution = execution
self.context = {}
self.result = None
self.attachments = Attachments()
self.configuration = configuration
self._advice_chain = advice_chain
self._moderator = None
self._ids = None
self._temp_dir = None
conclude(self, result)
🔗
Conclude the process of advising with a Result
.
This can be used in two cases:
-
Concluding BEFORE the task was actually executed. This will skip all subsequent advices defined later in the advice chain and will skip the task execution. The given
result
will then be used as preliminary result. All advices that are defined before the calling advice in the advice chain will still be called with thereafter(session)
method. -
Concluding AFTER the task was actually executed. This will just change the
result
of the session and continue will all following advices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
bandsaw.result.Result |
The result to conclude with. |
required |
Source code in bandsaw/session.py
def conclude(self, result):
"""
Conclude the process of advising with a `Result`.
This can be used in two cases:
1. Concluding BEFORE the task was actually executed. This will skip all
subsequent advices defined later in the advice chain and will skip the
task execution. The given `result` will then be used as preliminary result.
All advices that are defined before the calling advice in the advice chain
will still be called with there `after(session)` method.
2. Concluding AFTER the task was actually executed. This will just change the
`result` of the session and continue will all following advices.
Args:
result (bandsaw.result.Result): The result to conclude with.
"""
self.result = result
self._moderator.skip(self)
initiate(self)
🔗
Start the process of advising an execution.
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
The final result of the execution after all advices. |
Source code in bandsaw/session.py
def initiate(self):
"""
Start the process of advising an execution.
Returns:
bandsaw.result.Result: The final result of the execution after all
advices.
"""
self._moderator = _Moderator(
self.configuration.get_advice_chain(self._advice_chain)
)
logger.debug("running extensions before advice")
for extension in self.configuration.extensions:
extension.on_session_created(self)
self.proceed()
if not self._moderator.is_finished:
raise RuntimeError(
f"Not all advice has been applied. "
f"Misbehaving advice {self._moderator.current_advice}"
)
logger.debug("running extensions after advice")
for extension in self.configuration.extensions:
extension.on_session_finished(self)
return self.result
proceed(self)
🔗
Continue the process of advising with the next advice.
Source code in bandsaw/session.py
def proceed(self):
"""
Continue the process of advising with the next advice.
"""
self._moderator.next(self)
restore(self, stream)
🔗
Resume a prior suspended session.
Source code in bandsaw/session.py
def restore(self, stream):
"""
Resume a prior suspended session.
"""
self._load_from_zip(stream)
return self
save(self, stream)
🔗
Suspend the session to be resumed later or elsewhere.
Source code in bandsaw/session.py
def save(self, stream):
"""
Suspend the session to be resumed later or elsewhere.
"""
self._store_as_zip(stream)
tasks
🔗
Contains classes and functions representing different types of tasks
Task (SerializableValue, ABC)
🔗
Base-class for different types of Tasks
that can be executed
Attributes:
Name | Type | Description |
---|---|---|
task_id |
str |
A unique identifier for the individual tasks. |
advice_parameters |
dict |
A dictionary with additional arguments provided at task definition. |
source |
str |
The python source code as string which defines the task. |
bytecode |
bytes |
The compiled byte code of the task definition. |
Source code in bandsaw/tasks.py
class Task(SerializableValue, abc.ABC):
"""Base-class for different types of `Tasks` that can be executed
Attributes:
task_id (str): A unique identifier for the individual tasks.
advice_parameters (dict): A dictionary with additional arguments provided at
task definition.
source (str): The python source code as string which defines the task.
bytecode (bytes): The compiled byte code of the task definition.
"""
# For different types of callable
# https://stackoverflow.com/questions/19314405/how-to-detect-is-decorator-has-been-applied-to-method-or-function
def __init__(self, task_id, advice_parameters):
self.task_id = task_id
self._advice_parameters = advice_parameters
@property
def advice_parameters(self):
"""Additional parameters for advices defined at task definition."""
return dict(self._advice_parameters)
@property
@abc.abstractmethod
def source(self):
"""The python source code as `str` which defines the task."""
@property
@abc.abstractmethod
def bytecode(self):
"""The compiled byte code of the task definition as `bytes`."""
@property
@abc.abstractmethod
def signature(self):
"""The signature() of the callable representing this task."""
@abc.abstractmethod
def _execute(self, args, kwargs):
"""
Execute the task with the given arguments.
Args:
args: The positional arguments to use during execution.
kwargs: The keyword arguments to use during execution.
Returns:
Any: The returned value from the task.
Raises:
Any: During the execution the task can raise arbitrary exceptions.
"""
def execute(self, execution):
"""
Execute the task with the arguments specified by the execution.
Args:
execution (bandsaw.execution.Execution): The definition which contains how
the task should be executed.
Returns:
bandsaw.result.Result: A `Result` object with either the returned value
from the task or an exception that was raised by the task.
"""
try:
result_value = self._execute(execution.args, execution.kwargs)
result = Result(value=result_value)
except Exception as error: # pylint: disable=W0703 # too general exception
result = Result(exception=error)
return result
@classmethod
def create_task(cls, obj, advice_parameters=None):
"""
Factory for creating a task for different Python objects.
Args:
obj (Any): Python object that should be run as a task.
advice_parameters (dict): A dictionary containing additional arguments to
be used by the advices.
Returns:
bandsaw.tasks.Task: Instance of `Task` class that allows to execute the
task.
Raises:
TypeError: If there is no support for this type of python object.
"""
if advice_parameters is None:
advice_parameters = {}
if isinstance(obj, types.FunctionType):
if '.<locals>.' in obj.__qualname__:
return _FunctionWithClosureTask(obj, advice_parameters)
function_name, module_name = object_as_import(obj)
return _FunctionTask(function_name, module_name, advice_parameters)
raise TypeError(f"Unsupported task object of type {type(obj)}")
advice_parameters
property
readonly
🔗
Additional parameters for advices defined at task definition.
bytecode
property
readonly
🔗
The compiled byte code of the task definition as bytes
.
signature
property
readonly
🔗
The signature() of the callable representing this task.
source
property
readonly
🔗
The python source code as str
which defines the task.
create_task(obj, advice_parameters=None)
classmethod
🔗
Factory for creating a task for different Python objects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
Python object that should be run as a task. |
required |
advice_parameters |
dict |
A dictionary containing additional arguments to be used by the advices. |
None |
Returns:
Type | Description |
---|---|
bandsaw.tasks.Task |
Instance of |
Exceptions:
Type | Description |
---|---|
TypeError |
If there is no support for this type of python object. |
Source code in bandsaw/tasks.py
@classmethod
def create_task(cls, obj, advice_parameters=None):
"""
Factory for creating a task for different Python objects.
Args:
obj (Any): Python object that should be run as a task.
advice_parameters (dict): A dictionary containing additional arguments to
be used by the advices.
Returns:
bandsaw.tasks.Task: Instance of `Task` class that allows to execute the
task.
Raises:
TypeError: If there is no support for this type of python object.
"""
if advice_parameters is None:
advice_parameters = {}
if isinstance(obj, types.FunctionType):
if '.<locals>.' in obj.__qualname__:
return _FunctionWithClosureTask(obj, advice_parameters)
function_name, module_name = object_as_import(obj)
return _FunctionTask(function_name, module_name, advice_parameters)
raise TypeError(f"Unsupported task object of type {type(obj)}")
execute(self, execution)
🔗
Execute the task with the arguments specified by the execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution |
bandsaw.execution.Execution |
The definition which contains how the task should be executed. |
required |
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
A |
Source code in bandsaw/tasks.py
def execute(self, execution):
"""
Execute the task with the arguments specified by the execution.
Args:
execution (bandsaw.execution.Execution): The definition which contains how
the task should be executed.
Returns:
bandsaw.result.Result: A `Result` object with either the returned value
from the task or an exception that was raised by the task.
"""
try:
result_value = self._execute(execution.args, execution.kwargs)
result = Result(value=result_value)
except Exception as error: # pylint: disable=W0703 # too general exception
result = Result(exception=error)
return result
tracking
special
🔗
backend
🔗
Interface for tracking backends
Backend
🔗
Base class for backend implementations
Source code in bandsaw/tracking/backend.py
class Backend:
"""Base class for backend implementations"""
def track_run(self, ids, run_info):
"""
Track a run
Args:
ids (bandsaw.session.Ids): Ids where the run was first used.
run_info (Dict[str,Any]): A dictionary containing tracking information for
this run.
"""
def track_distribution_archive(self, distribution_archive):
"""
Track a distribution archive.
Args:
distribution_archive (bandsaw.distribution.DistributionArchive): The
archive which should be tracked.
"""
def track_task(self, ids, task_info):
"""
Track a task.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
task_info (Dict[str,Any]): A dictionary containing tracking information
for a task.
"""
def track_execution(self, ids, execution_info):
"""
Track an execution.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
execution_info (Dict[str,Any]): A dictionary containing tracking
information for the execution.
"""
def track_session(self, ids, session_info):
"""
Track a session.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
session_info (Dict[str,Any]): A dictionary containing tracking
information for this session.
"""
def track_result(self, ids, result_info):
"""
Track the result of a session.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
result_info (Dict[str,Any]): A dictionary containing tracking
information for this result.
"""
def track_attachments(self, ids, attachments):
"""
Track the attachments of a session.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
attachments (bandsaw.session.Attachments): An instance of `Attachments`
which gives access to the files that were attached to a session.
"""
track_attachments(self, ids, attachments)
🔗
Track the attachments of a session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
attachments |
bandsaw.session.Attachments |
An instance of |
required |
Source code in bandsaw/tracking/backend.py
def track_attachments(self, ids, attachments):
"""
Track the attachments of a session.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
attachments (bandsaw.session.Attachments): An instance of `Attachments`
which gives access to the files that were attached to a session.
"""
track_distribution_archive(self, distribution_archive)
🔗
Track a distribution archive.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
distribution_archive |
bandsaw.distribution.DistributionArchive |
The archive which should be tracked. |
required |
Source code in bandsaw/tracking/backend.py
def track_distribution_archive(self, distribution_archive):
"""
Track a distribution archive.
Args:
distribution_archive (bandsaw.distribution.DistributionArchive): The
archive which should be tracked.
"""
track_execution(self, ids, execution_info)
🔗
Track an execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
execution_info |
Dict[str,Any] |
A dictionary containing tracking information for the execution. |
required |
Source code in bandsaw/tracking/backend.py
def track_execution(self, ids, execution_info):
"""
Track an execution.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
execution_info (Dict[str,Any]): A dictionary containing tracking
information for the execution.
"""
track_result(self, ids, result_info)
🔗
Track the result of a session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
result_info |
Dict[str,Any] |
A dictionary containing tracking information for this result. |
required |
Source code in bandsaw/tracking/backend.py
def track_result(self, ids, result_info):
"""
Track the result of a session.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
result_info (Dict[str,Any]): A dictionary containing tracking
information for this result.
"""
track_run(self, ids, run_info)
🔗
Track a run
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where the run was first used. |
required |
run_info |
Dict[str,Any] |
A dictionary containing tracking information for this run. |
required |
Source code in bandsaw/tracking/backend.py
def track_run(self, ids, run_info):
"""
Track a run
Args:
ids (bandsaw.session.Ids): Ids where the run was first used.
run_info (Dict[str,Any]): A dictionary containing tracking information for
this run.
"""
track_session(self, ids, session_info)
🔗
Track a session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
session_info |
Dict[str,Any] |
A dictionary containing tracking information for this session. |
required |
Source code in bandsaw/tracking/backend.py
def track_session(self, ids, session_info):
"""
Track a session.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
session_info (Dict[str,Any]): A dictionary containing tracking
information for this session.
"""
track_task(self, ids, task_info)
🔗
Track a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
task_info |
Dict[str,Any] |
A dictionary containing tracking information for a task. |
required |
Source code in bandsaw/tracking/backend.py
def track_task(self, ids, task_info):
"""
Track a task.
Args:
ids (bandsaw.session.Ids): Ids where task was first used.
task_info (Dict[str,Any]): A dictionary containing tracking information
for a task.
"""
filesystem
🔗
Tracking backend using filesystem
FileSystemBackend (Backend)
🔗
Tracking backend that stores data in the local file system.
Source code in bandsaw/tracking/filesystem.py
class FileSystemBackend(Backend):
"""Tracking backend that stores data in the local file system."""
def __init__(self, directory):
"""
Create a new backend.
Args:
directory (str): Directory where the tracking data will be stored.
"""
self.directory = pathlib.Path(directory)
logger.info("Tracking sessions in directory '%s'", self.directory)
super().__init__()
def track_run(self, ids, run_info):
run_dir = self.directory / 'runs' / ids.run_id
run_dir.mkdir(parents=True)
run_info_path = run_dir / 'run-info.json'
with run_info_path.open('w') as stream:
json.dump(run_info, stream)
def track_task(self, ids, task_info):
task_dir = self.directory / 'tasks' / ids.task_id
task_dir.mkdir(parents=True, exist_ok=True)
task_info_path = task_dir / 'task-info.json'
with task_info_path.open('w') as stream:
json.dump(task_info, stream)
def track_execution(self, ids, execution_info):
execution_dir = self.directory / 'tasks' / ids.task_id / ids.execution_id
execution_dir.mkdir(parents=True, exist_ok=True)
execution_info_path = execution_dir / 'execution-info.json'
with execution_info_path.open('w') as stream:
json.dump(execution_info, stream)
def track_session(self, ids, session_info):
self._store_session_info(ids, session_info)
self._store_session_for_run(ids)
def track_result(self, ids, result_info):
self._store_session_result(ids, result_info)
def track_attachments(self, ids, attachments):
self._store_session_attachments(ids, attachments)
def _store_session_for_run(self, ids):
run_dir = self.directory / 'runs' / ids.run_id
run_dir.mkdir(parents=True, exist_ok=True)
run_session_file = run_dir / str(ids)
run_session_file.touch()
def _store_session_info(self, ids, session_info):
session_dir = (
self.directory / 'tasks' / ids.task_id / ids.execution_id / ids.run_id
)
session_dir.mkdir(parents=True, exist_ok=True)
session_info_file = session_dir / 'session-info.json'
with session_info_file.open('w') as stream:
json.dump(session_info, stream)
def _store_session_result(self, ids, result_info):
session_dir = (
self.directory / 'tasks' / ids.task_id / ids.execution_id / ids.run_id
)
session_dir.mkdir(parents=True, exist_ok=True)
session_info_file = session_dir / 'result-info.json'
with session_info_file.open('w') as stream:
json.dump(result_info, stream)
def _store_session_attachments(self, ids, attachments):
session_dir = (
self.directory / 'tasks' / ids.task_id / ids.execution_id / ids.run_id
)
attachments_dir = session_dir / 'attachments'
attachments_dir.mkdir(parents=True)
for name, attachment in attachments.items():
attachment_path = attachments_dir / name
with attachment.open() as input_stream:
with attachment_path.open('wb') as output_stream:
shutil.copyfileobj(input_stream, output_stream)
__init__(self, directory)
special
🔗
Create a new backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str |
Directory where the tracking data will be stored. |
required |
Source code in bandsaw/tracking/filesystem.py
def __init__(self, directory):
"""
Create a new backend.
Args:
directory (str): Directory where the tracking data will be stored.
"""
self.directory = pathlib.Path(directory)
logger.info("Tracking sessions in directory '%s'", self.directory)
super().__init__()
track_attachments(self, ids, attachments)
🔗
Track the attachments of a session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
attachments |
bandsaw.session.Attachments |
An instance of |
required |
Source code in bandsaw/tracking/filesystem.py
def track_attachments(self, ids, attachments):
self._store_session_attachments(ids, attachments)
track_execution(self, ids, execution_info)
🔗
Track an execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
execution_info |
Dict[str,Any] |
A dictionary containing tracking information for the execution. |
required |
Source code in bandsaw/tracking/filesystem.py
def track_execution(self, ids, execution_info):
execution_dir = self.directory / 'tasks' / ids.task_id / ids.execution_id
execution_dir.mkdir(parents=True, exist_ok=True)
execution_info_path = execution_dir / 'execution-info.json'
with execution_info_path.open('w') as stream:
json.dump(execution_info, stream)
track_result(self, ids, result_info)
🔗
Track the result of a session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
result_info |
Dict[str,Any] |
A dictionary containing tracking information for this result. |
required |
Source code in bandsaw/tracking/filesystem.py
def track_result(self, ids, result_info):
self._store_session_result(ids, result_info)
track_run(self, ids, run_info)
🔗
Track a run
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where the run was first used. |
required |
run_info |
Dict[str,Any] |
A dictionary containing tracking information for this run. |
required |
Source code in bandsaw/tracking/filesystem.py
def track_run(self, ids, run_info):
run_dir = self.directory / 'runs' / ids.run_id
run_dir.mkdir(parents=True)
run_info_path = run_dir / 'run-info.json'
with run_info_path.open('w') as stream:
json.dump(run_info, stream)
track_session(self, ids, session_info)
🔗
Track a session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
session_info |
Dict[str,Any] |
A dictionary containing tracking information for this session. |
required |
Source code in bandsaw/tracking/filesystem.py
def track_session(self, ids, session_info):
self._store_session_info(ids, session_info)
self._store_session_for_run(ids)
track_task(self, ids, task_info)
🔗
Track a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
bandsaw.session.Ids |
Ids where task was first used. |
required |
task_info |
Dict[str,Any] |
A dictionary containing tracking information for a task. |
required |
Source code in bandsaw/tracking/filesystem.py
def track_task(self, ids, task_info):
task_dir = self.directory / 'tasks' / ids.task_id
task_dir.mkdir(parents=True, exist_ok=True)
task_info_path = task_dir / 'task-info.json'
with task_info_path.open('w') as stream:
json.dump(task_info, stream)
tracker
🔗
Contains Advice that tracks task executions in a local file system.
TrackerExtension (Extension)
🔗
Advice that tracks task executions and their data in the file system.
Attributes:
Name | Type | Description |
---|---|---|
_backend |
bandsaw.tracking.backend.Backend |
The backend implementation to use. |
Source code in bandsaw/tracking/tracker.py
class TrackerExtension(Extension):
"""
Advice that tracks task executions and their data in the file system.
Attributes:
_backend (bandsaw.tracking.backend.Backend): The backend implementation to use.
"""
def __init__(self, backend):
"""
Advice that tracks task executions and their data in the file system.
Args:
backend (bandsaw.tracking.backend.Backend): The backend implementation to
use.
Raises:
TypeError: If `backend` does not inherit from `Backend` base class.
ValueError: If no backend is given.
"""
if backend is None:
raise ValueError("Backend must be set.")
if not isinstance(backend, Backend):
raise TypeError("`backend` is not of type `Backend`.")
self._backend = backend
logger.info("Tracking sessions using backend '%s'", self._backend)
self._tracked_runs = set()
self._tracked_tasks = set()
self._tracked_executions = set()
self._tracked_sessions = set()
self._tracked_results = set()
self._tracked_attachments = set()
super().__init__()
def on_session_created(self, session):
self._track_run(session)
self._track_task(session)
self._track_execution(session)
self._track_session(session)
def on_session_finished(self, session):
self._track_result(session)
self._track_attachments(session)
def _track_run(self, session):
if session.run_id not in self._tracked_runs:
self._backend.track_run(session.ids, {'id': session.run_id})
self._tracked_runs.add(session.run_id)
def _track_task(self, session):
if session.task.task_id not in self._tracked_tasks:
self._backend.track_task(session.ids, self._create_task_info(session))
self._tracked_tasks.add(session.task.task_id)
def _track_execution(self, session):
combined_id = session.task.task_id + '_' + session.execution.execution_id
if combined_id not in self._tracked_executions:
self._backend.track_execution(
session.ids, self._create_execution_info(session)
)
self._tracked_executions.add(combined_id)
def _track_session(self, session):
if session.session_id not in self._tracked_sessions:
self._backend.track_session(session.ids, self._create_session_info(session))
self._tracked_sessions.add(session.session_id)
def _track_result(self, session):
if session.session_id not in self._tracked_results:
self._backend.track_result(session.ids, self._create_result_info(session))
self._tracked_results.add(session.session_id)
def _track_attachments(self, session):
if session.session_id not in self._tracked_attachments:
self._backend.track_attachments(session.ids, session.attachments)
self._tracked_attachments.add(session.session_id)
@staticmethod
def _create_run_info(session):
run_info = {
'run': {
'id': session.run_id,
},
'configuration': session.configuration.module_name,
'distribution_archive': {
'modules': session.distribution_archive.modules,
'id': None, # session.distribution_archive.archive_id,
},
}
return run_info
@staticmethod
def _create_task_info(session):
task_info = {
'task': {
'id': session.task.task_id,
'definition': str(session.task),
'advice_parameters': session.task.advice_parameters,
},
}
return task_info
def _create_execution_info(self, session):
def _argument_infos(task, execution):
"""
The names of the positional and keyword arguments for this task.
Returns:
tuple[List[str],Set[str]]: Tuple containing a list with the names of the
positional arguments and a set with the names of the keyword
arguments.
"""
signature = task.signature
bound_args = signature.bind(*execution.args, **execution.kwargs)
bound_args.apply_defaults()
all_infos = []
for name, value in bound_args.arguments.items():
info = value_info(value)
info['name'] = name
all_infos.append(info)
return all_infos
execution_info = self._create_task_info(session)
execution_info['execution'] = {
'id': session.execution.execution_id,
'arguments': _argument_infos(session.task, session.execution),
}
return execution_info
def _create_session_info(self, session):
tracking_info = self._create_execution_info(session)
tracking_info.update(self._create_run_info(session))
tracking_info.update(
{
'session': {
'id': str(session.session_id),
},
'task': {
'id': session.task.task_id,
'definition': str(session.task),
'advice_parameters': session.task.advice_parameters,
},
}
)
return tracking_info
def _create_result_info(self, session):
def _result_value_infos(result_value):
"""
The names of the positional and keyword arguments for this task.
Returns:
tuple[List[str],Set[str]]: Tuple containing a list with the names of the
positional arguments and a set with the names of the keyword
arguments.
"""
result_value_infos = []
if isinstance(result_value, dict):
for name, value in result_value.items():
info = value_info(value)
info['key'] = name
result_value_infos.append(info)
elif isinstance(result_value, list):
for index, value in enumerate(result_value):
info = value_info(value)
info['index'] = index
result_value_infos.append(info)
else:
info = value_info(result_value)
result_value_infos = info
return result_value_infos
result = session.result
result_info = self._create_session_info(session)
result_info['result'] = {}
if result.exception:
result_info['result']['exception'] = type(result.exception).__name__
result_info['result']['message'] = str(result.exception)
else:
result_info['result']['value'] = _result_value_infos(result.value)
return result_info
__init__(self, backend)
special
🔗
Advice that tracks task executions and their data in the file system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backend |
bandsaw.tracking.backend.Backend |
The backend implementation to use. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If |
ValueError |
If no backend is given. |
Source code in bandsaw/tracking/tracker.py
def __init__(self, backend):
"""
Advice that tracks task executions and their data in the file system.
Args:
backend (bandsaw.tracking.backend.Backend): The backend implementation to
use.
Raises:
TypeError: If `backend` does not inherit from `Backend` base class.
ValueError: If no backend is given.
"""
if backend is None:
raise ValueError("Backend must be set.")
if not isinstance(backend, Backend):
raise TypeError("`backend` is not of type `Backend`.")
self._backend = backend
logger.info("Tracking sessions using backend '%s'", self._backend)
self._tracked_runs = set()
self._tracked_tasks = set()
self._tracked_executions = set()
self._tracked_sessions = set()
self._tracked_results = set()
self._tracked_attachments = set()
super().__init__()
on_session_created(self, session)
🔗
Called before bandsaw advises a task.
This is called before any advice is applied.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The new session. |
required |
Source code in bandsaw/tracking/tracker.py
def on_session_created(self, session):
self._track_run(session)
self._track_task(session)
self._track_execution(session)
self._track_session(session)
on_session_finished(self, session)
🔗
Called after bandsaw advised a task.
This is called after all advices have been applied and the final result is available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session. |
required |
Source code in bandsaw/tracking/tracker.py
def on_session_finished(self, session):
self._track_result(session)
self._track_attachments(session)
user
🔗
Contains functions related to users
get_current_username()
🔗
Returns the name of the user which is currently running the python process.
Returns:
Type | Description |
---|---|
str |
The name of the user on the local system. |
Source code in bandsaw/user.py
def get_current_username():
"""
Returns the name of the user which is currently running the python process.
Returns:
str: The name of the user on the local system.
"""
return pwd.getpwuid(os.getuid())[0]