API
A library for splitting python workflows into separate tasks
advice
🔗
Contains classes and functions for advising tasks.
Advice (Protocol)
🔗
Interface that needs to be implemented by an advice.
The interface is quite simple. One has to implement two different methods,
before(session)
and after(session)
, that are called during the process of
advising a task execution. Both take a single argument session
which contains an
instance of the class Session
. This object allows the individual advices to
influence the task execution by changing the way the task is being called or
making changes to the result.
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advice.py
def after(self, session):
"""
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the `session`.
In order to continue, the advice MUST either call `session.proceed()`, which
will continue the process with current `result` and the next advice in the
advice chain, or call `session.conclude(result)` with a `Result` instance,
which will set a different result and continue with it.
The default implementation will just call `session.proceed()`.
Args:
session (bandsaw.session.Session): The session of the execution.
"""
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advice.py
def before(self, session):
"""
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call `session.proceed()`, which will continue the process with the next
advice in the advice chain, or call `session.conclude(result)` with a `Result`
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call `session.proceed()`.
Args:
session (bandsaw.session.Session): The session of the execution.
"""
session.proceed()
advise_task_with_chain(task, run, configuration, advice_chain='default')
🔗
Executes an Task
with additional advices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
bandsaw.tasks.Task |
The task to be executed. |
required |
run |
bandsaw.run.Run |
The run definition for the task. |
required |
configuration |
bandsaw.config.Configuration |
The configuration which should be used during advising. |
required |
advice_chain |
str |
The name of the advice chain which contains the additional advices to be applied to the task. Defaults to 'default'. |
'default' |
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
The result of the task execution. |
Source code in bandsaw/advice.py
def advise_task_with_chain(task, run, configuration, advice_chain='default'):
"""
Executes an `Task` with additional advices.
Args:
task (bandsaw.tasks.Task): The task to be executed.
run (bandsaw.run.Run): The run definition for the task.
configuration (bandsaw.config.Configuration): The configuration which should
be used during advising.
advice_chain (str): The name of the advice chain which contains the additional
advices to be applied to the task. Defaults to 'default'.
Returns:
bandsaw.result.Result: The result of the task execution.
"""
session = Session(task, run, configuration, advice_chain)
return session.initiate()
advices
special
🔗
Package that contains reusable Advice
classes
cache
🔗
Contains Advice that can cache task results in a local file system.
CachingAdvice (Advice)
🔗
Advice that caches results in a local filesystem.
Attributes:
Name | Type | Description |
---|---|---|
directory |
Path |
The path to the directory where the results are cached. |
__init__(self, directory)
special
🔗
Initialize self. See help(type(self)) for accurate signature.
Source code in bandsaw/advices/cache.py
def __init__(self, directory):
self.directory = pathlib.Path(directory)
logger.info("Caching artifacts in storage '%s'", self.directory)
super().__init__()
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/cache.py
def after(self, session):
cache_item_path = pathlib.Path(session.context['cache-item-path'])
if not cache_item_path.exists():
cache_item_directory = cache_item_path.parent
if not cache_item_directory.exists():
cache_item_directory.mkdir(parents=True)
logger.info("Storing result in cache '%s'", cache_item_path)
with open(cache_item_path, 'wb') as stream:
session.serializer.serialize(session.result, stream)
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/cache.py
def before(self, session):
artifact_id = session.task.task_id
revision_id = session.run.run_id
cache_item_path = self.directory / artifact_id / revision_id
session.context['cache-item-path'] = str(cache_item_path)
if cache_item_path.exists():
logger.info("Using result from cache '%s'", cache_item_path)
with open(cache_item_path, 'rb') as stream:
result = session.serializer.deserialize(stream)
session.conclude(result)
return
session.proceed()
logging
🔗
Contains an Advice
implementation which adds logging
LoggingAdvice (Advice)
🔗
An Advice which adds additional logging
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/logging.py
def after(self, session):
logger.info(
"AFTER %s:%s with context %s",
session.task.task_id,
session.run.run_id,
session.context,
)
session.proceed()
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/logging.py
def before(self, session):
logger.info(
"BEFORE %s:%s with context %s",
session.task.task_id,
session.run.run_id,
session.context,
)
session.proceed()
subprocess
🔗
Contains Advice implementation that runs the execution in a subprocess
SubprocessAdvice (Advice)
🔗
Advice that runs in a subprocess
__init__(self, directory=None, interpreter=None)
special
🔗
Create a new instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str |
The directory where temporary files are stored to
exchange data between both processes. If |
None |
interpreter |
bandsaw.interpreter.Interpreter |
The interpreter to use in
the subprocess. If |
None |
Source code in bandsaw/advices/subprocess.py
def __init__(self, directory=None, interpreter=None):
"""
Create a new instance.
Args:
directory (str): The directory where temporary files are stored to
exchange data between both processes. If `None` a temporary directory
is used.
interpreter (bandsaw.interpreter.Interpreter): The interpreter to use in
the subprocess. If `None` the same interpreter will be used.
"""
if directory is None:
self.directory = pathlib.Path(tempfile.mkdtemp())
else:
self.directory = pathlib.Path(directory)
logger.info("Using directory %s", self.directory)
self.interpreter = interpreter or Interpreter()
super().__init__()
after(self, session)
🔗
Called after the task is actually executed.
This methods allows the individual advice, to make changes to the result of
the task execution. The result can be retrieved from the session
.
In order to continue, the advice MUST either call session.proceed()
, which
will continue the process with current result
and the next advice in the
advice chain, or call session.conclude(result)
with a Result
instance,
which will set a different result and continue with it.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/subprocess.py
def after(self, session):
logger.info("after called in process %d", os.getpid())
logger.info("Sub process created result %s", session.result)
session_out_path = session.context['subprocess']['session_out.path']
logger.info("Writing session with result to %s", session_out_path)
with io.FileIO(session_out_path, mode='w') as stream:
session.save(stream)
before(self, session)
🔗
Called before the task is actually executed.
This methods allows the individual advice, to make changes to the way the
task execution is later executed. In order to continue, the advice MUST either
call session.proceed()
, which will continue the process with the next
advice in the advice chain, or call session.conclude(result)
with a Result
instance, which will skip the following advices and return without executing
the task execution at all.
The default implementation will just call session.proceed()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
bandsaw.session.Session |
The session of the execution. |
required |
Source code in bandsaw/advices/subprocess.py
def before(self, session):
logger.info("before called in process %d", os.getpid())
session_id = session.run.run_id
session_in_file, session_in_path = tempfile.mkstemp(
'.zip', 'in-' + session_id + '-', self.directory
)
session_out_file, session_out_path = tempfile.mkstemp(
'.zip', 'out-' + session_id + '-', self.directory
)
session.context['subprocess'] = {
'session_in.path': session_in_path,
'session_out.path': session_out_path,
}
logger.info("Writing session to %s", session_in_path)
with io.FileIO(session_in_file, mode='w') as stream:
session.save(stream)
logger.info(
"Running subprocess using interpreter %s", self.interpreter.executable
)
environment = self.interpreter.environment
environment['PYTHONPATH'] = ':'.join(self.interpreter.path)
subprocess.check_call(
[
self.interpreter.executable,
'-m',
'bandsaw.advices.subprocess',
'--input',
session_in_path,
'--output',
session_out_path,
],
env=environment,
)
logger.info("Sub process exited")
with io.FileIO(session_out_file, mode='r') as stream:
session.restore(stream)
session.proceed()
config
🔗
Contains the class and functions to configure bandsaw.
Configuration
🔗
Class that represents a configuration for bandsaw.
add_advice_chain(self, *advices, *, name='default')
🔗
Add a new advice chain to the configuration.
Each advice chain has a unique name
. If multiple chains with the same name
are added to the configuration, the last chain overwrites all previous chains.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*advices |
bandsaw.advice.Advice |
A tuple of advices for this chain. |
() |
name |
str |
The name of the advice chain, defaults to 'default' if not specified. |
'default' |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the chain was added. |
Source code in bandsaw/config.py
def add_advice_chain(self, *advices, name='default'):
"""
Add a new advice chain to the configuration.
Each advice chain has a unique `name`. If multiple chains with the same name
are added to the configuration, the last chain overwrites all previous chains.
Args:
*advices (bandsaw.advice.Advice): A tuple of advices for this chain.
name (str): The name of the advice chain, defaults to 'default' if not
specified.
Returns:
bandsaw.config.Configuration: The configuration to which the chain was
added.
"""
self._advice_chains[name] = advices
return self
add_extension(self, extension)
🔗
Add an Extension
to the configuration.
Extensions
are objects that can implement callbacks to be informed by
bandsaw about certain conditions, e.g. the creation of new tasks or the final
result of an execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
extension |
bandsaw.extension.Extension |
An object implementing the
|
required |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the extension was added. |
Source code in bandsaw/config.py
def add_extension(self, extension):
"""
Add an `Extension` to the configuration.
`Extensions` are objects that can implement callbacks to be informed by
bandsaw about certain conditions, e.g. the creation of new tasks or the final
result of an execution.
Args:
extension (bandsaw.extension.Extension): An object implementing the
`Extension`.
Returns:
bandsaw.config.Configuration: The configuration to which the extension was
added.
"""
self.extensions.append(extension)
return self
get_advice_chain(self, name)
🔗
Returns the advice chain with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the wanted advice chain. |
required |
Returns:
Type | Description |
---|---|
List[bandsaw.advice.Advice] |
The advice chain with the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If no chain with the specified name is configured. |
Source code in bandsaw/config.py
def get_advice_chain(self, name):
"""
Returns the advice chain with the given name.
Args:
name (str): Name of the wanted advice chain.
Returns:
List[bandsaw.advice.Advice]: The advice chain with the given name.
Raises:
KeyError: If no chain with the specified name is configured.
"""
return self._advice_chains.get(name)
set_serializer(self, serializer)
🔗
Sets the serialize which defines how tasks and results will be serialized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
serializer |
bandsaw.serialization.Serializer |
The serializer to use for serializing objects. |
required |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration to which the extension was added. |
Source code in bandsaw/config.py
def set_serializer(self, serializer):
"""
Sets the serialize which defines how tasks and results will be serialized.
Args:
serializer (bandsaw.serialization.Serializer): The serializer to use for
serializing objects.
Returns:
bandsaw.config.Configuration: The configuration to which the extension was
added.
"""
self.serializer = serializer
return self
get_configuration(configuration_module=None)
🔗
Return a configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration_module |
str |
The module name of a module, which contains the
configuration. The module needs to define a member 'configuration', which
contains an instance of |
None |
Returns:
Type | Description |
---|---|
bandsaw.config.Configuration |
The configuration. |
Exceptions:
Type | Description |
---|---|
ModuleNotFoundError |
If no module exists with name |
LookupError |
If the module doesn't contain a variable 'configuration`. |
TypeError |
If the variable |
Source code in bandsaw/config.py
def get_configuration(configuration_module=None):
"""
Return a configuration.
Args:
configuration_module (str): The module name of a module, which contains the
configuration. The module needs to define a member 'configuration', which
contains an instance of `Configuration`. If no module name is given, a
default configuration is returned based on the value of the
`BANDSAW_CONFIG_MODULE` environment variable. If this variable is not set,
we default to 'bandsaw_config'.
Returns:
bandsaw.config.Configuration: The configuration.
Raises:
ModuleNotFoundError: If no module exists with name `configuration_module`.
LookupError: If the module doesn't contain a variable 'configuration`.
TypeError: If the variable `configuration` is not of type `Configuration`.
"""
if configuration_module is None:
default_configuration_module_name = os.getenv(
CONFIGURATION_MODULE_ENV_VARIABLE,
CONFIGURATION_MODULE_DEFAULT,
)
configuration_module = default_configuration_module_name
if configuration_module not in _configurations:
try:
_load_configuration_module(configuration_module)
except ModuleNotFoundError:
logger.warning(
"No module found for config %s",
configuration_module,
)
raise
return _configurations[configuration_module]
context
🔗
Classes that represent the context used in advising tasks.
Context (SerializableValue)
🔗
Class for representing the context for advising tasks.
The context contains of a set of arbitrary key-value mappings that can be used
by the Advice
classes to store state or communicate with other advices.
attributes
property
readonly
🔗
A set of arbitrary key-value mappings for the Advice
classes.
Advice
can add to this mapping and use this as a way of keeping state.
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/context.py
@classmethod
def deserialize(cls, values):
return Context(values['attributes'])
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/context.py
def serialized(self):
data = {
'attributes': self._attributes,
}
return data
decorator
🔗
Contains decorators that allow to define individual tasks
task(*task_args, *, config=None, chain=None, **task_kwargs)
🔗
Decorator that is used to define a function as as task.
The decorator can be used in two different ways, standalone:
Examples:
>>> @task
... def my_task_function():
... pass
or with additional configuration.
Examples:
>>> @task(config='my.config')
... def my_task_function():
... pass
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
str |
The name of the configuration module to use for this task. If not given, the default configuration is used. |
None |
chain |
str |
The name of the advice chain to use for advising this task. If not given, 'default' is used. |
None |
*task_args |
Positional args given to the decorator OR the decorated function.
If the decorator is used WITHOUT providing additional configuration,
|
() |
|
**task_kwargs |
Keyword args given to the decorator.
If the decorator is used WITHOUT providing additional configuration,
|
{} |
Returns:
Type | Description |
---|---|
Callable |
Returns a callable that wraps the decorated function. |
Exceptions:
Type | Description |
---|---|
ModuleNotFoundError |
If the configured configuration module does not exist. |
ValueError |
If the specified advice chain does not exist. |
RuntimeError |
If the task is configured with multiple positional arguments. |
Source code in bandsaw/decorator.py
def task(*task_args, config=None, chain=None, **task_kwargs):
"""
Decorator that is used to define a function as as task.
The decorator can be used in two different ways, standalone:
Example:
>>> @task
... def my_task_function():
... pass
or with additional configuration.
Example:
>>> @task(config='my.config')
... def my_task_function():
... pass
Args:
config (str): The name of the configuration module to use for this task.
If not given, the default configuration is used.
chain (str): The name of the advice chain to use for advising this task.
If not given, 'default' is used.
*task_args: Positional args given to the decorator OR the decorated function.
If the decorator is used WITHOUT providing additional configuration,
`task_args` contains a tuple with a single item that is the function to be
used as a task. If there is additional configuration given, `task_args`
contains the positional arguments of the call of the decorator.
**task_kwargs: Keyword args given to the decorator.
If the decorator is used WITHOUT providing additional configuration,
`task_kwargs` is an empty dictionary. If there is additional configuration
given, `task_kwargs`contains the keyword arguments of the call of the
decorator.
Returns:
Callable: Returns a callable that wraps the decorated function.
Raises:
ModuleNotFoundError: If the configured configuration module does not exist.
ValueError: If the specified advice chain does not exist.
RuntimeError: If the task is configured with multiple positional arguments.
"""
config_module = config
configuration = get_configuration(config_module)
chain_name = chain or 'default'
chain = configuration.get_advice_chain(chain_name)
if chain is None:
raise ValueError(f"Unknown advice chain {chain_name}")
def decorate_function(func):
logger.info("Decorate function '%s'", func)
logger.info("Creating task for function '%s'", func)
the_task = Task.create_task(func)
def inner(*func_args, **func_kwargs):
run_id = _calculate_run_id(
func_args,
func_kwargs,
configuration.serializer,
)
run = Run(run_id, func_args, func_kwargs)
result = advise_task_with_chain(
the_task,
run,
configuration,
chain_name,
)
if result.exception:
raise result.exception
return result.value
inner.__wrapped__ = func
inner.bandsaw_task = the_task
inner.bandsaw_configuration = configuration
return inner
if len(task_args) == 1 and len(task_kwargs) == 0:
return decorate_function(task_args[0])
if len(task_args) == 0 and len(task_kwargs) > 0:
return decorate_function
# This shouldn't happen if the decorator is properly used.
raise RuntimeError("Invalid 'task' decorator.")
extensions
🔗
Contains an API for extensions that can be used in bandsaw
Extension
🔗
Class that defines the interface of extensions.
An extension can define different callbacks that are called by bandsaw and allows
to extend some existing functionality (e.g. by setting additional values in a
context before it is handled by all advices) or integrate other systems.
Other than Advice
, an Extension
is globally defined in a config and therefore
applies to all tasks.
on_after_advice(self, task, run, context, result)
🔗
Called after bandsaw advises a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
bandsaw.tasks.Task |
The task which was advised. |
required |
run |
bandsaw.run.Run |
The run which contains the parametrization of the task. |
required |
context |
bandsaw.context.Context |
The context which was used during the advice. |
required |
result |
bandsaw.result.Result |
The result of the call. |
required |
Source code in bandsaw/extensions.py
def on_after_advice(self, task, run, context, result):
"""
Called after bandsaw advises a task.
Args:
task (bandsaw.tasks.Task): The task which was advised.
run (bandsaw.run.Run): The run which contains the parametrization of the
task.
context (bandsaw.context.Context): The context which was used during the
advice.
result (bandsaw.result.Result): The result of the call.
"""
on_before_advice(self, task, run, context)
🔗
Called before bandsaw advises a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
bandsaw.tasks.Task |
The task which will be advised. |
required |
run |
bandsaw.run.Run |
The run which contains the parametrization of the task. |
required |
context |
bandsaw.context.Context |
The context which will be used during the advice. The context can be extended by the extension. |
required |
Source code in bandsaw/extensions.py
def on_before_advice(self, task, run, context):
"""
Called before bandsaw advises a task.
Args:
task (bandsaw.tasks.Task): The task which will be advised.
run (bandsaw.run.Run): The run which contains the parametrization of the
task.
context (bandsaw.context.Context): The context which will be used during
the advice. The context can be extended by the extension.
"""
on_init(self, configuration)
🔗
Called when a bandsaw configuration has been initialized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration |
bandsaw.config.Configuration |
The configuration object which contains the config that has been loaded. |
required |
Source code in bandsaw/extensions.py
def on_init(self, configuration):
"""
Called when a bandsaw configuration has been initialized.
Args:
configuration (bandsaw.config.Configuration): The configuration object
which contains the config that has been loaded.
"""
identifier
🔗
Functions for generating identifier for arbitrary python objects.
identifier_from_bytes(buffer)
🔗
Derive an identifier from a bytebuffer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
buffer |
Union[bytes,bytearray] |
The binary data from which to derive an identifier. |
required |
Returns:
Type | Description |
---|---|
str |
The identifier in form of a string of a hexadecimal number. |
Source code in bandsaw/identifier.py
def identifier_from_bytes(buffer):
"""
Derive an identifier from a bytebuffer.
Args:
buffer (Union[bytes,bytearray]): The binary data from which to derive an
identifier.
Returns:
str: The identifier in form of a string of a hexadecimal number.
"""
identifier = hashlib.sha256(buffer).hexdigest()
return identifier
identifier_from_string(string)
🔗
Derive an identifier from a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string |
str |
The string from which to derive an identifier. |
required |
Returns:
Type | Description |
---|---|
str |
The identifier in form of a string of a hexadecimal number. |
Source code in bandsaw/identifier.py
def identifier_from_string(string):
"""
Derive an identifier from a string.
Args:
string (str): The string from which to derive an identifier.
Returns:
str: The identifier in form of a string of a hexadecimal number.
"""
identifier = identifier_from_bytes(string.encode('utf-8'))
return identifier
interpreter
🔗
Contains classes regarding python interpreters
Interpreter (SerializableValue)
🔗
Class for representing different python interpreters.
This class is used to contain the information about specific python interpreters that are used within the library. In order to support multiple different interpreters there will be the option to define the interpreter as part of config. Currently only a single interpreter is automatically defined.
environment
property
readonly
🔗
The environment variables to be set for the interpreter.
path
property
readonly
🔗
The python path items that will be used.
__init__(self, path=None, executable=None)
special
🔗
Create a new interpreter instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
List[str] |
A list of directory paths, to be used as $PYTHONPATH.
If |
None |
executable |
str |
The path to the python executable for this interpreter.
If |
None |
Source code in bandsaw/interpreter.py
def __init__(
self,
path=None,
executable=None,
):
"""
Create a new interpreter instance.
Args:
path (List[str]): A list of directory paths, to be used as $PYTHONPATH.
If `None` the current `sys.path` is used.
executable (str): The path to the python executable for this interpreter.
If `None` the current `sys.executable` is used.
"""
self._path = path or tuple(sys.path)
self.executable = executable or sys.executable
self._environment = {}
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/interpreter.py
@classmethod
def deserialize(cls, values):
return Interpreter(
path=values['path'],
executable=values['executable'],
).set_environment(**values['environment'])
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/interpreter.py
def serialized(self):
return {
'path': self._path,
'executable': self.executable,
'environment': self._environment,
}
set_environment(self, **environment)
🔗
Set the environment variables to use for this interpreter.
A call to this methods overwrites all variables that have been set previously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**environment |
Arbitrary keyword arguments where the name of the keyword corresponds to the name of the environment variable and the values will be the values set in the environment. |
{} |
Source code in bandsaw/interpreter.py
def set_environment(self, **environment):
"""
Set the environment variables to use for this interpreter.
A call to this methods overwrites all variables that have been set previously.
Args:
**environment: Arbitrary keyword arguments where the name of the keyword
corresponds to the name of the environment variable and the values
will be the values set in the environment.
"""
self._environment = environment
return self
io
🔗
Contains classes related to input/output of data.
BytearrayGeneratorToStream (RawIOBase)
🔗
Stream that reads from a generator that yields bytes/bytearrays.
read_stream_to_generator(stream, buffer_size=8192)
🔗
Read from a stream into a generator yielding bytes
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The stream to read bytes from. |
required |
buffer_size |
int |
The buffer size to read. Each individual |
8192 |
!!! yields
bytes: A byte buffer of maximum size of buffer_size
.
Source code in bandsaw/io.py
def read_stream_to_generator(stream, buffer_size=8192):
"""
Read from a stream into a generator yielding `bytes`.
Args:
stream (io.Stream): The stream to read bytes from.
buffer_size (int): The buffer size to read. Each individual `bytes` buffer
returned by the generator has a maximum size of `buffer_size`. Defaults
to `8192` if not set.
Yields:
bytes: A byte buffer of maximum size of `buffer_size`.
"""
buffer = bytearray(buffer_size)
while True:
read = stream.readinto(buffer)
if read:
yield buffer[:read]
else:
break
modules
🔗
Utility functions for handling python modules.
get_loaded_module_name_by_path(file_path)
🔗
Determine the name of an already loaded module by its file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
File path of the python file containing the module. |
required |
Returns:
Type | Description |
---|---|
str |
The name of the module, or |
Source code in bandsaw/modules.py
def get_loaded_module_name_by_path(file_path):
"""
Determine the name of an already loaded module by its file path.
Args:
file_path (str): File path of the python file containing the module.
Returns:
str: The name of the module, or `None` if the file isn't loaded as a module.
"""
real_path = os.path.realpath(file_path)
for name, module in sys.modules.items():
if hasattr(module, '__file__'):
module_path = os.path.realpath(module.__file__)
if module_path == real_path:
return name
return None
import_object(object_name, module_name)
🔗
Import a python object from a module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object_name |
str |
The name under which the object is defined in the module. |
required |
module_name |
str |
The name of the module from which the object should be imported. |
required |
Returns:
Type | Description |
---|---|
object |
The python object defined under the name. |
Exceptions:
Type | Description |
---|---|
AttributeError |
If nothing is defined with name |
ModuleNotFoundError |
If no module exists with the name |
Source code in bandsaw/modules.py
def import_object(object_name, module_name):
"""
Import a python object from a module.
Args:
object_name (str): The name under which the object is defined in the module.
module_name (str): The name of the module from which the object should be
imported.
Returns:
object: The python object defined under the name.
Raises:
AttributeError: If nothing is defined with name `object_name` in the
referenced module.
ModuleNotFoundError: If no module exists with the name `module_name`.
"""
module = importlib.import_module(module_name)
return getattr(module, object_name)
object_as_import(obj)
🔗
Returns the name and module of an object, which can be used for importing it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
object |
An arbitrary Python object. |
required |
Returns:
Type | Description |
---|---|
Tuple(str, str) |
Returns a tuple of the object name and the module name in which the object is defined. |
Exceptions:
Type | Description |
---|---|
ValueError |
If |
!!! note
If obj
is defined within the __main__
script, the function tries to
determine a name for the __main__
module, under which it could be imported
from other scripts.
Source code in bandsaw/modules.py
def object_as_import(obj):
"""
Returns the name and module of an object, which can be used for importing it.
Args:
obj (object): An arbitrary Python object.
Returns:
Tuple(str, str): Returns a tuple of the object name and the module name in
which the object is defined.
Raises:
ValueError: If `obj` doesn't have a name that can be directly imported, e.g.
because it is defined within a local class.
Note:
If `obj` is defined within the `__main__` script, the function tries to
determine a name for the `__main__` module, under which it could be imported
from other scripts.
"""
object_name = obj.__name__
module_name = obj.__module__
if module_name == '__main__':
module_file_path = sys.modules['__main__'].__file__
module_name = _guess_module_name_by_path(module_file_path, sys.path)
if '<locals>' in obj.__qualname__:
raise ValueError("Can't import local functions.")
return object_name, module_name
result
🔗
Contains code for representing the result of tasks.
Result (SerializableValue)
🔗
Class to encapsulate the result of a task execution.
Attributes:
Name | Type | Description |
---|---|---|
value |
Any |
The value that is returned by the task. |
exception |
Exception |
The exception that was raised during execution, |
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/result.py
@classmethod
def deserialize(cls, values):
value = values["value"]
exception = values["exception"]
return Result(value=value, exception=exception)
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/result.py
def serialized(self):
values = {
"value": self.value,
"exception": self.exception,
}
return values
run
🔗
Contains classes and functions around a run of tasks
Run (SerializableValue)
🔗
Class that defines a run of a Task
.
Attributes:
Name | Type | Description |
---|---|---|
run_id |
str |
A string identifying this run. |
args |
tuple[Any] |
The positional arguments for the task to use in this execution. |
kwargs |
Dict[Any,Any] |
The keyword arguments for the task to use in this execution. |
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/run.py
@classmethod
def deserialize(cls, values):
return Run(
values['run_id'],
values['args'],
values['kwargs'],
)
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/run.py
def serialized(self):
return {
'run_id': self.run_id,
'args': self.args,
'kwargs': self.kwargs,
}
serialization
special
🔗
Module for the package bandsaw.serialization. Contains all public types.
json
🔗
Contains a Serializer that allows to serialize objects to JSON.
JsonSerializer (Serializer)
🔗
A Serializer
which serializes objects to JSON.
Attributes:
Name | Type | Description |
---|---|---|
value_serializers |
List[ValueSerializer] |
A list of serializers that are used for serialization of custom types. |
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/json.py
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
result = json.load(
text_stream,
cls=_ExtensibleJSONDecoder,
value_serializers=self.value_serializers,
)
text_stream.detach()
return result
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/json.py
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
json.dump(
value,
text_stream,
cls=_ExtensibleJSONEncoder,
value_serializers=self.value_serializers,
allow_nan=False,
indent=None,
separators=(',', ':'),
sort_keys=True,
)
text_stream.detach()
pickle
🔗
Contains a Serializer which uses pickle for serializing values.
PickleSerializer (Serializer)
🔗
A Serializer
which serializes objects using pickle.
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/pickle.py
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
return pickle.load(stream)
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/pickle.py
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
pickle.dump(value, stream)
serializer
🔗
Base classes for serializers which allow to serialize python values.
Serializer (ABC)
🔗
Interface for Serializer
which serialize objects
deserialize(self, stream)
🔗
Deserialize a value/object from a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
io.Stream |
The binary stream from where the serialized value is read. |
required |
Returns:
Type | Description |
---|---|
Any |
The object/value which was deserialized. |
Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def deserialize(self, stream):
"""
Deserialize a value/object from a binary stream.
Args:
stream (io.Stream): The binary stream from where the serialized value is
read.
Returns:
Any: The object/value which was deserialized.
"""
serialize(self, value, stream)
🔗
Serialize a value/object into a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The object/value to be serialized. |
required |
stream |
io.Stream |
The binary stream where the serialized value is written. |
required |
Source code in bandsaw/serialization/serializer.py
@abc.abstractmethod
def serialize(self, value, stream):
"""
Serialize a value/object into a binary stream.
Args:
value (Any): The object/value to be serialized.
stream (io.Stream): The binary stream where the serialized value is
written.
"""
values
🔗
A collection of classes for serializing custom objects.
ExceptionSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing exceptions.
The serializer saves only the type and the args
attribute of the exception,
therefore it won't work for all exception types, but it should cover the most.
Other attributes of the exception, e.g. stacktrace etc. are discarded.
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, Exception)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
module_name = representation['module']
type_name = representation['type']
module = importlib.import_module(module_name)
value_type = getattr(module, type_name)
return value_type(*representation['args'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
state = {
'type': type(value).__name__,
'module': type(value).__module__,
'args': value.args,
}
return state
SerializableValue (ABC)
🔗
Interface for types that can serialize themselves.
deserialize(values)
classmethod
🔗
Returns a new instance of a value from its serialized representation.
Source code in bandsaw/serialization/values.py
@classmethod
@abc.abstractmethod
def deserialize(cls, values):
"""Returns a new instance of a value from its serialized representation."""
serialized(self)
🔗
Returns a serializable representation of the value.
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialized(self):
"""Returns a serializable representation of the value."""
SerializableValueSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing subclasses of SerializableValue
.
The serializer uses the methods defined in SerializableValue
and implemented
by the individual classes to serialize values. It stores the type of the value
and its serialized representation and allows to recreate the value from this
information.
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, SerializableValue)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
module_name = representation['module']
type_name = representation['type']
module = importlib.import_module(module_name)
value_type = getattr(module, type_name)
return value_type.deserialize(representation['serialized'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
state = {
'type': type(value).__name__,
'module': type(value).__module__,
'serialized': value.serialized(),
}
return state
TupleSerializer (ValueSerializer)
🔗
A ValueSerializer for serializing tuples.
The serializer supports normal tuples as well as named tuples. When namedtuples are deserialized it first tries to reuse an existing namedtople type. If the type can't be imported or reused, a new namedtuple type with the same name and fields is created on the fly.
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
def can_serialize_value(self, value):
return isinstance(value, tuple)
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
def deserialize_value(self, representation):
if representation['type'] == 'namedtuple':
# try to import the namedtuple type
module_name = representation['module']
type_name = representation['name']
try:
module = importlib.import_module(module_name)
tuple_type = getattr(module, type_name)
except (ImportError, AttributeError) as error:
logger.warning(
"Error importing namedtuple, trying to recreate it: %s", error
)
# Recreate a new type
field_names = ' '.join(representation['fields'])
tuple_type = collections.namedtuple(
type_name, field_names, module=module_name
)
return tuple_type(*representation['items'])
return tuple(representation['items'])
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
def serialize_value(self, value):
if hasattr(value, '_fields'):
state = {
'type': 'namedtuple',
'fields': list(value._fields),
'name': type(value).__name__,
'module': type(value).__module__,
'items': list(value),
}
else:
state = {
'type': 'tuple',
'items': list(value),
}
return state
ValueSerializer (ABC)
🔗
Interface for serializers that can serialize custom values.
can_serialize_value(self, value)
🔗
Returns if a serializer can serialize a specific value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
boolean |
|
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def can_serialize_value(self, value):
"""
Returns if a serializer can serialize a specific value.
Args:
value (Any): The value that should be serialized.
Returns:
boolean: `True` if this serializer can serialize the given value,
otherwise `False`.
"""
deserialize_value(self, representation)
🔗
Returns a deserialized value from its serialized representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
representation |
Any |
The serialized representation of the value. |
required |
Returns:
Type | Description |
---|---|
Any |
The deserialized value. |
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def deserialize_value(self, representation):
"""
Returns a deserialized value from its serialized representation.
Args:
representation (Any): The serialized representation of the value.
Returns:
Any: The deserialized value.
"""
serialize_value(self, value)
🔗
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive values, lists or dicts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be serialized. |
required |
Returns:
Type | Description |
---|---|
Any |
The serialized representation of the value. |
Source code in bandsaw/serialization/values.py
@abc.abstractmethod
def serialize_value(self, value):
"""
Returns a serialized representation of the given value.
The returned representation can use standard python types like primitive
values, lists or dicts.
Args:
value (Any): The value that should be serialized.
Returns:
Any: The serialized representation of the value.
"""
session
🔗
Contains classes for representing an advising session
Session
🔗
Class that handles the advising of an execution.
A Session
object is given to the individual advices that are called to advise
the execution. By calling the appropriate methods like def proceed(self)
to
continue or conclude()
to end with a result, the advices can influence the final
result.
Additionally, the session provides access to the context
, which allows advices
to keep state, the execution
that is advised, the configuration
that is used
for advising and the result
of the execution.
Attributes:
Name | Type | Description |
---|---|---|
task |
bandsaw.tasks.Task |
The task that is executed. |
run |
bandsaw.run.Run |
The run definition for the task. |
context |
bandsaw.context.Context |
The context that can be used for advices to store state. |
result |
bandsaw.result.Result |
Result of the task if already computed.
Otherwise |
_configuration |
bandsaw.config.Configuration |
The configuration that is being used for advising this task. |
serializer
property
readonly
🔗
The serializer that can be used for serializing values.
__init__(self, task=None, run=None, configuration=None, advice_chain='default')
special
🔗
Create a new session.
Source code in bandsaw/session.py
def __init__(
self,
task=None,
run=None,
configuration=None,
advice_chain='default',
):
"""
Create a new session.
"""
self.task = task
self.run = run
self.context = {}
self.result = None
self._configuration = configuration
self._advice_chain = advice_chain
self._moderator = None
conclude(self, result)
🔗
Conclude the process of advising with a Result
.
This can be used in two cases:
-
Concluding BEFORE the task was actually executed. This will skip all subsequent advices defined later in the advice chain and will skip the task execution. The given
result
will then be used as preliminary result. All advices that are defined before the calling advice in the advice chain will still be called with thereafter(session)
method. -
Concluding AFTER the task was actually executed. This will just change the
result
of the session and continue will all following advices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
bandsaw.result.Result |
The result to conclude with. |
required |
Source code in bandsaw/session.py
def conclude(self, result):
"""
Conclude the process of advising with a `Result`.
This can be used in two cases:
1. Concluding BEFORE the task was actually executed. This will skip all
subsequent advices defined later in the advice chain and will skip the
task execution. The given `result` will then be used as preliminary result.
All advices that are defined before the calling advice in the advice chain
will still be called with there `after(session)` method.
2. Concluding AFTER the task was actually executed. This will just change the
`result` of the session and continue will all following advices.
Args:
result (bandsaw.result.Result): The result to conclude with.
"""
self.result = result
self._moderator.skip(self)
initiate(self)
🔗
Start the process of advising an execution.
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
The final result of the execution after all advices. |
Source code in bandsaw/session.py
def initiate(self):
"""
Start the process of advising an execution.
Returns:
bandsaw.result.Result: The final result of the execution after all
advices.
"""
self._moderator = _Moderator(
self._configuration.get_advice_chain(self._advice_chain)
)
logger.debug("running extensions before advice")
for extension in self._configuration.extensions:
extension.on_before_advice(self.task, self.run, self.context)
self.proceed()
if not self._moderator.is_finished:
raise RuntimeError(
f"Not all advice has been applied. "
f"Misbehaving advice {self._moderator.current_advice}"
)
logger.debug("running extensions after advice")
for extension in self._configuration.extensions:
extension.on_after_advice(self.task, self.run, self.context, self.result)
return self.result
proceed(self)
🔗
Continue the process of advising with the next advice.
Source code in bandsaw/session.py
def proceed(self):
"""
Continue the process of advising with the next advice.
"""
self._moderator.next(self)
restore(self, stream)
🔗
Resume a prior suspended session.
Source code in bandsaw/session.py
def restore(self, stream):
"""
Resume a prior suspended session.
"""
self._load_from_zip(stream)
return self
save(self, stream)
🔗
Suspend the session to be resumed later or elsewhere.
Source code in bandsaw/session.py
def save(self, stream):
"""
Suspend the session to be resumed later or elsewhere.
"""
self._store_as_zip(stream)
tasks
🔗
Contains classes and functions representing different types of tasks
Task (SerializableValue, ABC)
🔗
Base-class for different types of Tasks
that can be executed
Attributes:
Name | Type | Description |
---|---|---|
task_id |
str |
A unique identifier for the individual tasks. |
source |
str |
The python source code as string which defines the task. |
bytecode |
bytes |
The compiled byte code of the task definition. |
bytecode
property
readonly
🔗
The compiled byte code of the task definition as bytes
.
source
property
readonly
🔗
The python source code as str
which defines the task.
create_task(obj)
classmethod
🔗
Factory for creating a task for different Python objects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
Python object that should be run as a task. |
required |
Returns:
Type | Description |
---|---|
bandsaw.tasks.Task |
Instance of |
Exceptions:
Type | Description |
---|---|
TypeError |
If there is no support for this type of python object. |
Source code in bandsaw/tasks.py
@classmethod
def create_task(cls, obj):
"""
Factory for creating a task for different Python objects.
Args:
obj (Any): Python object that should be run as a task.
Returns:
bandsaw.tasks.Task: Instance of `Task` class that allows to execute the
task.
Raises:
TypeError: If there is no support for this type of python object.
"""
if isinstance(obj, types.FunctionType):
if '.<locals>.' in obj.__qualname__:
return _FunctionWithClosureTask(obj)
function_name, module_name = object_as_import(obj)
return _FunctionTask(function_name, module_name)
raise TypeError(f"Unsupported task object of type {type(obj)}")
execute_run(self, run)
🔗
Execute the task with the arguments specified by the run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run |
bandsaw.run.Run |
The run definition which contains how the task should be run. |
required |
Returns:
Type | Description |
---|---|
bandsaw.result.Result |
A |
Source code in bandsaw/tasks.py
def execute_run(self, run):
"""
Execute the task with the arguments specified by the run.
Args:
run (bandsaw.run.Run): The run definition which contains how the task
should be run.
Returns:
bandsaw.result.Result: A `Result` object with either the returned value
from the task or an exception that was raised by the task.
"""
try:
result_value = self._execute(run.args, run.kwargs)
result = Result(value=result_value)
except Exception as error: # pylint: disable=W0703 # too general exception
result = Result(exception=error)
return result