API
Automatically track data and artifacts
This package provides an API to automatically track data and artifacts in a machine learning process without the need to manually think about file names or S3 keys. By using its API the data is automatically stored and loaded in different versions per execution which allows to compare the data between different runs.
api
🔗
API to be used by users
info(data_ref)
🔗
Load info from a reference to an item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_ref |
boxs.data.DataRef |
Data reference that points to the data whose info is requested. |
required |
Returns:
Type | Description |
---|---|
boxs.data.DataInfo |
The info about the data. |
Exceptions:
Type | Description |
---|---|
boxs.errors.BoxNotDefined |
If the data is stored in an unknown box. |
boxs.errors.DataNotFound |
If no data with the specific ids are stored in this box. |
Source code in boxs/api.py
def info(data_ref):
"""
Load info from a reference to an item.
Args:
data_ref (boxs.data.DataRef): Data reference that points to the data
whose info is requested.
Returns:
boxs.data.DataInfo: The info about the data.
Raises:
boxs.errors.BoxNotDefined: If the data is stored in an unknown box.
boxs.errors.DataNotFound: If no data with the specific ids are stored in this
box.
"""
box_id = data_ref.box_id
box = get_box(box_id)
logger.debug("Getting info about value %s from box %s", data_ref.uri, box.box_id)
return box.info(data_ref)
load(data, value_type=None)
🔗
Load the content of the data item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[boxs.data.DataRef,boxs.data.DataInfo] |
DataInfo or DataRef that points to the data that should be loaded. |
required |
value_type |
boxs.value_types.ValueType |
The value type to use when
loading the data. Defaults to |
None |
Returns:
Type | Description |
---|---|
Any |
The loaded data. |
Exceptions:
Type | Description |
---|---|
boxs.errors.BoxNotDefined |
If the data is stored in an unknown box. |
boxs.errors.DataNotFound |
If no data with the specific ids are stored in the referenced box. |
Source code in boxs/api.py
def load(data, value_type=None):
"""
Load the content of the data item.
Args:
data (Union[boxs.data.DataRef,boxs.data.DataInfo]): DataInfo or
DataRef that points to the data that should be loaded.
value_type (boxs.value_types.ValueType): The value type to use when
loading the data. Defaults to `None`, in which case the same value
type will be used that was used when the data was initially stored.
Returns:
Any: The loaded data.
Raises:
boxs.errors.BoxNotDefined: If the data is stored in an unknown box.
boxs.errors.DataNotFound: If no data with the specific ids are stored in the
referenced box.
"""
box_id = data.box_id
box = get_box(box_id)
logger.debug("Loading value %s from box %s", data.uri, box.box_id)
return box.load(data, value_type=value_type)
store(value, *parents, *, name=None, origin=ORIGIN_FROM_FUNCTION_NAME, tags=None, meta=None, value_type=None, run_id=None, box=None)
🔗
Store new data in this box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
A value that should be stored. |
required |
*parents |
Union[boxs.data.DataInfo,boxs.data.DataRef] |
Parent data refs, that this data depends on. |
() |
origin |
Union[str,Callable] |
A string or callable returning a string,
that is used as an origin for deriving the data's id. Defaults to a
callable, that takes the name of the function, from which |
ORIGIN_FROM_FUNCTION_NAME |
name |
str |
An optional user-defined name, that can be used for looking up data manually. |
None |
tags |
Dict[str,str] |
A dictionary of tags that can be used for grouping multiple data together. Keys and values have to be strings. |
None |
meta |
Dict[str, Any] |
Additional meta-data about this data. This can be used for arbitrary information that might be useful, e.g. information about type or format of the data, timestamps, user info etc. |
None |
value_type |
boxs.value_types.ValueType |
The value_type to use for writing
this value to the storage. Defaults to |
None |
run_id |
str |
The id of the run when the data was stored. Defaults to the
current global run_id (see |
None |
box |
Union[str,boxs.box.Box] |
The box in which the data should be stored.
The box can be either given as Box instance, or by its |
None |
Returns:
Type | Description |
---|---|
boxs.data.DataInfo |
Data instance that contains information about the data and allows referring to it. |
Exceptions:
Type | Description |
---|---|
ValueError |
If no box or no origin was provided. |
boxs.errors.BoxNotDefined |
If no box with the given box id is defined. |
Source code in boxs/api.py
def store(
value,
*parents,
name=None,
origin=ORIGIN_FROM_FUNCTION_NAME,
tags=None,
meta=None,
value_type=None,
run_id=None,
box=None
):
"""
Store new data in this box.
Args:
value (Any): A value that should be stored.
*parents (Union[boxs.data.DataInfo,boxs.data.DataRef]): Parent data refs,
that this data depends on.
origin (Union[str,Callable]): A string or callable returning a string,
that is used as an origin for deriving the data's id. Defaults to a
callable, that takes the name of the function, from which `store` is
being called as origin.
name (str): An optional user-defined name, that can be used for looking up
data manually.
tags (Dict[str,str]): A dictionary of tags that can be used for grouping
multiple data together. Keys and values have to be strings.
meta (Dict[str, Any]): Additional meta-data about this data. This can be
used for arbitrary information that might be useful, e.g. information
about type or format of the data, timestamps, user info etc.
value_type (boxs.value_types.ValueType): The value_type to use for writing
this value to the storage. Defaults to `None` in which case a suitable
value type is taken from the list of predefined values types.
run_id (str): The id of the run when the data was stored. Defaults to the
current global run_id (see `get_run_id()`).
box (Union[str,boxs.box.Box]): The box in which the data should be stored.
The box can be either given as Box instance, or by its `box_id`.
Returns:
boxs.data.DataInfo: Data instance that contains information about the
data and allows referring to it.
Raises:
ValueError: If no box or no origin was provided.
boxs.errors.BoxNotDefined: If no box with the given box id is
defined.
"""
if box is None:
box = get_config().default_box
logger.debug("No box defined, using default_box %s from config", box)
if box is None:
raise ValueError("'box' must be set.")
if isinstance(box, str):
box = get_box(box)
origin = determine_origin(origin, name=name, tags=tags, level=3)
return box.store(
value,
*parents,
name=name,
origin=origin,
tags=tags,
meta=meta,
value_type=value_type,
run_id=run_id
)
box
🔗
Boxes to store items in
Box
🔗
Box that allows to store and load data.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id that uniquely identifies this Box. |
storage |
boxs.storage.Storage |
The storage that actually writes and reads the data. |
transformers |
boxs.storage.Transformer |
A tuple with transformers, that add additional meta-data and transform the data stored and loaded. |
Source code in boxs/box.py
class Box:
"""Box that allows to store and load data.
Attributes:
box_id (str): The id that uniquely identifies this Box.
storage (boxs.storage.Storage): The storage that actually writes and
reads the data.
transformers (boxs.storage.Transformer): A tuple with transformers, that
add additional meta-data and transform the data stored and loaded.
"""
def __init__(self, box_id, storage, *transformers):
self.box_id = box_id
self.storage = storage
self.transformers = transformers
self.value_types = [
BytesValueType(),
StreamValueType(),
StringValueType(),
FileValueType(),
JsonValueType(),
]
register_box(self)
def add_value_type(self, value_type):
"""
Add a new value type.
The value type is added at the beginning of the list, so that it takes
precedence over the already added value types.
Args:
value_type (boxs.value_types.ValueType): The new value type to add.
"""
self.value_types.insert(0, value_type)
def store(
self,
value,
*parents,
origin=ORIGIN_FROM_FUNCTION_NAME,
name=None,
tags=None,
meta=None,
value_type=None,
run_id=None,
):
"""
Store new data in this box.
Args:
value (Any): A value that should be stored.
*parents (Union[boxs.data.DataInfo, boxs.data.DataRef]): Parent data refs,
that this data depends on.
origin (Union[str,Callable]): A string or callable returning a string,
that is used as an origin for deriving the data's id. Defaults to a
callable, that takes the name of the function, from which `store` is
being called as origin.
name (str): An optional user-defined name, that can be used for looking up
data manually.
tags (Dict[str,str]): A dictionary of tags that can be used for grouping
multiple data together. Keys and values have to be strings.
meta (Dict[str, Any]): Additional meta-data about this data. This can be
used for arbitrary information that might be useful, e.g. information
about type or format of the data, timestamps, user info etc.
value_type (boxs.value_types.ValueType): The value_type to use for writing
this value to the storage. Defaults to `None` in which case a suitable
value type is taken from the list of predefined values types.
run_id (str): The id of the run when the data was stored.
Returns:
boxs.data.DataInfo: Data instance that contains information about the
data and allows referring to it.
"""
if tags is None:
tags = {}
if meta is None:
meta = {}
else:
meta = dict(meta)
origin = determine_origin(origin, name=name, tags=tags, level=3)
logger.info("Storing value in box %s with origin %s", self.box_id, origin)
parent_ids = tuple(p.data_id for p in parents)
data_id = calculate_data_id(origin, parent_ids=parent_ids, name=name)
logger.debug(
"Calculate data_id %s from origin %s with parents %s",
data_id,
origin,
parent_ids,
)
if run_id is None:
run_id = get_run_id()
ref = DataRef(self.box_id, data_id, run_id)
writer = self.storage.create_writer(ref, name, tags)
logger.debug("Created writer %s for data %s", writer, ref)
writer = self._apply_transformers_to_writer(writer)
if value_type is None:
value_type = self._find_suitable_value_type(value)
if value_type is None:
raise MissingValueType(value)
logger.debug(
"Write value for data %s with value type %s",
ref.uri,
value_type.get_specification(),
)
writer.write_value(value, value_type)
meta['value_type'] = value_type.get_specification()
meta = dict(meta)
meta.update(writer.meta)
data_info = DataInfo(
DataRef.from_item(writer.item),
origin=origin,
parents=parents,
name=name,
tags=tags,
meta=meta,
)
logger.debug("Write info for data %s", ref.uri)
writer.write_info(data_info.value_info())
return data_info
def _find_suitable_value_type(self, value):
value_type = None
for configured_value_type in self.value_types:
if configured_value_type.supports(value):
value_type = configured_value_type
logger.debug(
"Automatically chose value type %s",
value_type.get_specification(),
)
return value_type
def _apply_transformers_to_writer(self, writer):
for transformer in self.transformers:
logger.debug("Applying transformer %s", transformer)
writer = transformer.transform_writer(writer)
return writer
def load(self, data_ref, value_type=None):
"""
Load data from the box.
Args:
data_ref (Union[boxs.data.DataRef,boxs.data.DataInfo]): Data reference
that points to the data content to be loaded.
value_type (boxs.value_types.ValueType): The value type to use when
loading the data. Defaults to `None`, in which case the same value
type will be used that was used when the data was initially stored.
Returns:
Any: The loaded data.
Raises:
boxs.errors.DataNotFound: If no data with the specific ids are stored
in this box.
ValueError: If the data refers to a different box by its box_id.
"""
if data_ref.box_id != self.box_id:
raise ValueError("Data references different box id")
logger.info("Loading value %s from box %s", data_ref.uri, self.box_id)
info = data_ref.info
if value_type is None:
value_type = self._get_value_type_from_meta_data(info)
reader = self.storage.create_reader(data_ref)
logger.debug("Created reader %s for data %s", reader, data_ref)
reader = self._apply_transformers_to_reader(reader)
logger.debug(
"Read value from data %s with value type %s",
data_ref.uri,
value_type.get_specification(),
)
return reader.read_value(value_type)
@staticmethod
def _get_value_type_from_meta_data(info):
value_type_specification = info.meta['value_type']
value_type = ValueType.from_specification(value_type_specification)
logger.debug(
"Use value type %s taken from meta-data",
value_type.get_specification(),
)
return value_type
def _apply_transformers_to_reader(self, reader):
for transformer in reversed(self.transformers):
logger.debug("Applying transformer %s", transformer)
reader = transformer.transform_reader(reader)
return reader
def info(self, data_ref):
"""
Load info from the box.
Args:
data_ref (boxs.data.DataRef): Data reference that points to the data
whose info is requested.
Returns:
boxs.data.DataInfo: The info about the data.
Raises:
boxs.errors.DataNotFound: If no data with the specific ids are stored
in this box.
ValueError: If the data refers to a different box by its box_id.
"""
if data_ref.box_id != self.box_id:
raise ValueError("Data references different box id")
logger.info("Getting info for value %s from box %s", data_ref.uri, self.box_id)
reader = self.storage.create_reader(data_ref)
logger.debug("Created reader %s for data %s", reader, data_ref)
return DataInfo.from_value_info(reader.info)
add_value_type(self, value_type)
🔗
Add a new value type.
The value type is added at the beginning of the list, so that it takes precedence over the already added value types.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value_type |
boxs.value_types.ValueType |
The new value type to add. |
required |
Source code in boxs/box.py
def add_value_type(self, value_type):
"""
Add a new value type.
The value type is added at the beginning of the list, so that it takes
precedence over the already added value types.
Args:
value_type (boxs.value_types.ValueType): The new value type to add.
"""
self.value_types.insert(0, value_type)
info(self, data_ref)
🔗
Load info from the box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_ref |
boxs.data.DataRef |
Data reference that points to the data whose info is requested. |
required |
Returns:
Type | Description |
---|---|
boxs.data.DataInfo |
The info about the data. |
Exceptions:
Type | Description |
---|---|
boxs.errors.DataNotFound |
If no data with the specific ids are stored in this box. |
ValueError |
If the data refers to a different box by its box_id. |
Source code in boxs/box.py
def info(self, data_ref):
"""
Load info from the box.
Args:
data_ref (boxs.data.DataRef): Data reference that points to the data
whose info is requested.
Returns:
boxs.data.DataInfo: The info about the data.
Raises:
boxs.errors.DataNotFound: If no data with the specific ids are stored
in this box.
ValueError: If the data refers to a different box by its box_id.
"""
if data_ref.box_id != self.box_id:
raise ValueError("Data references different box id")
logger.info("Getting info for value %s from box %s", data_ref.uri, self.box_id)
reader = self.storage.create_reader(data_ref)
logger.debug("Created reader %s for data %s", reader, data_ref)
return DataInfo.from_value_info(reader.info)
load(self, data_ref, value_type=None)
🔗
Load data from the box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_ref |
Union[boxs.data.DataRef,boxs.data.DataInfo] |
Data reference that points to the data content to be loaded. |
required |
value_type |
boxs.value_types.ValueType |
The value type to use when
loading the data. Defaults to |
None |
Returns:
Type | Description |
---|---|
Any |
The loaded data. |
Exceptions:
Type | Description |
---|---|
boxs.errors.DataNotFound |
If no data with the specific ids are stored in this box. |
ValueError |
If the data refers to a different box by its box_id. |
Source code in boxs/box.py
def load(self, data_ref, value_type=None):
"""
Load data from the box.
Args:
data_ref (Union[boxs.data.DataRef,boxs.data.DataInfo]): Data reference
that points to the data content to be loaded.
value_type (boxs.value_types.ValueType): The value type to use when
loading the data. Defaults to `None`, in which case the same value
type will be used that was used when the data was initially stored.
Returns:
Any: The loaded data.
Raises:
boxs.errors.DataNotFound: If no data with the specific ids are stored
in this box.
ValueError: If the data refers to a different box by its box_id.
"""
if data_ref.box_id != self.box_id:
raise ValueError("Data references different box id")
logger.info("Loading value %s from box %s", data_ref.uri, self.box_id)
info = data_ref.info
if value_type is None:
value_type = self._get_value_type_from_meta_data(info)
reader = self.storage.create_reader(data_ref)
logger.debug("Created reader %s for data %s", reader, data_ref)
reader = self._apply_transformers_to_reader(reader)
logger.debug(
"Read value from data %s with value type %s",
data_ref.uri,
value_type.get_specification(),
)
return reader.read_value(value_type)
store(self, value, *parents, *, origin=ORIGIN_FROM_FUNCTION_NAME, name=None, tags=None, meta=None, value_type=None, run_id=None)
🔗
Store new data in this box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
A value that should be stored. |
required |
*parents |
Union[boxs.data.DataInfo, boxs.data.DataRef] |
Parent data refs, that this data depends on. |
() |
origin |
Union[str,Callable] |
A string or callable returning a string,
that is used as an origin for deriving the data's id. Defaults to a
callable, that takes the name of the function, from which |
ORIGIN_FROM_FUNCTION_NAME |
name |
str |
An optional user-defined name, that can be used for looking up data manually. |
None |
tags |
Dict[str,str] |
A dictionary of tags that can be used for grouping multiple data together. Keys and values have to be strings. |
None |
meta |
Dict[str, Any] |
Additional meta-data about this data. This can be used for arbitrary information that might be useful, e.g. information about type or format of the data, timestamps, user info etc. |
None |
value_type |
boxs.value_types.ValueType |
The value_type to use for writing
this value to the storage. Defaults to |
None |
run_id |
str |
The id of the run when the data was stored. |
None |
Returns:
Type | Description |
---|---|
boxs.data.DataInfo |
Data instance that contains information about the data and allows referring to it. |
Source code in boxs/box.py
def store(
self,
value,
*parents,
origin=ORIGIN_FROM_FUNCTION_NAME,
name=None,
tags=None,
meta=None,
value_type=None,
run_id=None,
):
"""
Store new data in this box.
Args:
value (Any): A value that should be stored.
*parents (Union[boxs.data.DataInfo, boxs.data.DataRef]): Parent data refs,
that this data depends on.
origin (Union[str,Callable]): A string or callable returning a string,
that is used as an origin for deriving the data's id. Defaults to a
callable, that takes the name of the function, from which `store` is
being called as origin.
name (str): An optional user-defined name, that can be used for looking up
data manually.
tags (Dict[str,str]): A dictionary of tags that can be used for grouping
multiple data together. Keys and values have to be strings.
meta (Dict[str, Any]): Additional meta-data about this data. This can be
used for arbitrary information that might be useful, e.g. information
about type or format of the data, timestamps, user info etc.
value_type (boxs.value_types.ValueType): The value_type to use for writing
this value to the storage. Defaults to `None` in which case a suitable
value type is taken from the list of predefined values types.
run_id (str): The id of the run when the data was stored.
Returns:
boxs.data.DataInfo: Data instance that contains information about the
data and allows referring to it.
"""
if tags is None:
tags = {}
if meta is None:
meta = {}
else:
meta = dict(meta)
origin = determine_origin(origin, name=name, tags=tags, level=3)
logger.info("Storing value in box %s with origin %s", self.box_id, origin)
parent_ids = tuple(p.data_id for p in parents)
data_id = calculate_data_id(origin, parent_ids=parent_ids, name=name)
logger.debug(
"Calculate data_id %s from origin %s with parents %s",
data_id,
origin,
parent_ids,
)
if run_id is None:
run_id = get_run_id()
ref = DataRef(self.box_id, data_id, run_id)
writer = self.storage.create_writer(ref, name, tags)
logger.debug("Created writer %s for data %s", writer, ref)
writer = self._apply_transformers_to_writer(writer)
if value_type is None:
value_type = self._find_suitable_value_type(value)
if value_type is None:
raise MissingValueType(value)
logger.debug(
"Write value for data %s with value type %s",
ref.uri,
value_type.get_specification(),
)
writer.write_value(value, value_type)
meta['value_type'] = value_type.get_specification()
meta = dict(meta)
meta.update(writer.meta)
data_info = DataInfo(
DataRef.from_item(writer.item),
origin=origin,
parents=parents,
name=name,
tags=tags,
meta=meta,
)
logger.debug("Write info for data %s", ref.uri)
writer.write_info(data_info.value_info())
return data_info
calculate_data_id(origin, parent_ids=(), name=None)
🔗
Derive a data_id from origin and parent_ids
Parameters:
Name | Type | Description | Default |
---|---|---|---|
origin |
str |
The origin of the data. |
required |
parent_ids |
tuple[str] |
A tuple of data_ids of "parent" data, that this data is derived from. |
() |
Returns:
Type | Description |
---|---|
str |
The data_id. |
Source code in boxs/box.py
def calculate_data_id(origin, parent_ids=tuple(), name=None):
"""
Derive a data_id from origin and parent_ids
Args:
origin (str): The origin of the data.
parent_ids (tuple[str]): A tuple of data_ids of "parent" data, that this data
is derived from.
Returns:
str: The data_id.
"""
id_origin_data = ':'.join(
[
origin,
name or '',
]
+ sorted(parent_ids)
)
return hashlib.blake2b(id_origin_data.encode('utf-8'), digest_size=8).hexdigest()
box_registry
🔗
Registry of boxes
get_box(box_id=None)
🔗
Return the box with the given box_id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
box_id |
Optional[str] |
The id of the box that should be returned. Defaults
to |
None |
Returns:
Type | Description |
---|---|
boxs.box.Box |
The box with the given |
Exceptions:
Type | Description |
---|---|
boxs.errors.BoxNotDefined |
If no box with the given id is defined. |
Source code in boxs/box_registry.py
def get_box(box_id=None):
"""
Return the box with the given box_id.
Args:
box_id (Optional[str]): The id of the box that should be returned. Defaults
to `None` in which case the default box is taken from the config and
returned.
Returns:
boxs.box.Box: The box with the given `box_id`.
Raises:
boxs.errors.BoxNotDefined: If no box with the given id is defined.
"""
logger.debug("Getting box %s", box_id)
if box_id is None:
box_id = get_config().default_box
logger.debug("Using default_box %s from config", box_id)
if box_id not in _BOX_REGISTRY:
raise BoxNotDefined(box_id)
return _BOX_REGISTRY[box_id]
register_box(box)
🔗
Registers a new box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
box |
boxs.box.Box |
The box that should be registered. |
required |
Exceptions:
Type | Description |
---|---|
boxs.errors.BoxAlreadyDefined |
If a box with the same id is already registered. |
Source code in boxs/box_registry.py
def register_box(box):
"""
Registers a new box.
Args:
box (boxs.box.Box): The box that should be registered.
Raises:
boxs.errors.BoxAlreadyDefined: If a box with the same id is already
registered.
"""
box_id = box.box_id
logger.info("Registering box %s", box_id)
if box_id in _BOX_REGISTRY:
raise BoxAlreadyDefined(box_id)
_BOX_REGISTRY[box.box_id] = box
unregister_box(box_id)
🔗
Unregisters the box with the given box_id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
box_id |
str |
The id of the box that should be removed. |
required |
Exceptions:
Type | Description |
---|---|
boxs.errors.BoxNotDefined |
If no box with the given id is defined. |
Source code in boxs/box_registry.py
def unregister_box(box_id):
"""
Unregisters the box with the given box_id.
Args:
box_id (str): The id of the box that should be removed.
Raises:
boxs.errors.BoxNotDefined: If no box with the given id is defined.
"""
logger.info("Unregistering box %s", box_id)
if box_id not in _BOX_REGISTRY:
raise BoxNotDefined(box_id)
del _BOX_REGISTRY[box_id]
checksum
🔗
Checksum data to detect errors
ChecksumTransformer (Transformer)
🔗
Transformer that calculates and verifies the checksums of data.
The transformer adds three values to the data's meta data: - 'checksum_digest': The hex-string representation of the checksum. - 'checksum_digest_size': The size in bytes of the checksum (not its representation). - 'checksum_algorithm': The hashing algorithm which is used for calculating the checksum. Currently, only 'blake2b' is supported.
Source code in boxs/checksum.py
class ChecksumTransformer(Transformer):
"""
Transformer that calculates and verifies the checksums of data.
The transformer adds three values to the data's meta data:
- 'checksum_digest': The hex-string representation of the checksum.
- 'checksum_digest_size': The size in bytes of the checksum (not its
representation).
- 'checksum_algorithm': The hashing algorithm which is used for calculating
the checksum. Currently, only 'blake2b' is supported.
"""
def __init__(self, digest_size=32):
"""
Create a new ChecksumTransformer.
Args:
digest_size (int): Length of the checksum in bytes. Defaults to `32`.
Since a checksum is represented as a hex-string, where a single byte
is represented by two characters, the length of the resulting checksum
string will be twice of the `digest_size`.
"""
self.digest_size = digest_size
def transform_reader(self, reader):
return _ChecksumReader(reader, default_digest_size=self.digest_size)
def transform_writer(self, writer):
return _ChecksumWriter(writer, digest_size=self.digest_size)
__init__(self, digest_size=32)
special
🔗
Create a new ChecksumTransformer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
digest_size |
int |
Length of the checksum in bytes. Defaults to |
32 |
Source code in boxs/checksum.py
def __init__(self, digest_size=32):
"""
Create a new ChecksumTransformer.
Args:
digest_size (int): Length of the checksum in bytes. Defaults to `32`.
Since a checksum is represented as a hex-string, where a single byte
is represented by two characters, the length of the resulting checksum
string will be twice of the `digest_size`.
"""
self.digest_size = digest_size
transform_reader(self, reader)
🔗
Transform a given reader.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
Reader object that is used for reading data content and meta-data. |
required |
Returns:
Type | Description |
---|---|
boxs.storage.Reader |
A modified reader that will be used instead. |
Source code in boxs/checksum.py
def transform_reader(self, reader):
return _ChecksumReader(reader, default_digest_size=self.digest_size)
transform_writer(self, writer)
🔗
Transform a given writer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
writer |
boxs.storage.Writer |
Writer object that is used for writing new data content and meta-data. |
required |
Returns:
Type | Description |
---|---|
boxs.storage.Writer |
A modified writer that will be used instead. |
Source code in boxs/checksum.py
def transform_writer(self, writer):
return _ChecksumWriter(writer, digest_size=self.digest_size)
DataChecksumMismatch (DataError)
🔗
Exception that is raised if a checksum doesn't match.
Attributes:
Name | Type | Description |
---|---|---|
item |
boxs.storage.Item |
The item where the mismatch occurred. |
expected |
str |
Checksum that was expected. |
calculated |
str |
Checksum that was actually calculated. |
Source code in boxs/checksum.py
class DataChecksumMismatch(DataError):
"""
Exception that is raised if a checksum doesn't match.
Attributes:
item (boxs.storage.Item): The item where the mismatch occurred.
expected (str): Checksum that was expected.
calculated (str): Checksum that was actually calculated.
"""
def __init__(self, item, expected, calculated):
self.item = item
self.expected = expected
self.calculated = calculated
super().__init__(
f"{self.item} has wrong checksum '{self.calculated}'"
f", expected '{self.expected}'"
)
cli
🔗
Command line interface
clean_runs_command(args)
🔗
Function that removes old runs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def clean_runs_command(args):
"""
Function that removes old runs.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
box = get_box()
storage = box.storage
logger.info("Removing runs in box %s", box.box_id)
runs = storage.list_runs(box.box_id)
runs_to_keep = set(runs[: args.count])
if not args.remove_named:
_keep_runs_with_name(runs, runs_to_keep)
if not args.ignore_dependencies:
_keep_runs_that_are_dependencies(runs_to_keep, storage)
runs_to_delete = [run for run in runs if run not in runs_to_keep]
_print_result("Delete runs", runs_to_delete, args)
if runs_to_delete:
if not args.quiet:
if not _confirm("Really delete all listed runs? (y/N)"):
return
for run in runs_to_delete:
box.storage.delete_run(run.box_id, run.run_id)
delete_run_command(args)
🔗
Command that allows to delete a specific run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def delete_run_command(args):
"""
Command that allows to delete a specific run.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
box = get_box()
storage = box.storage
run = _get_run_from_args(args)
if run is None:
return
logger.info(
"Deleting run %s in box %s",
run.run_id,
box.box_id,
)
if not args.quiet:
if not _confirm(
f"Really delete the run {run.run_id}? There might be other "
f"runs referencing data from it. (y/N)"
):
return
storage.delete_run(box.box_id, run.run_id)
_print_result(f"Run {run.run_id} deleted.", [run], args)
diff_command(args)
🔗
Command that compares two runs or data items.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def diff_command(args):
"""
Command that compares two runs or data items.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
def _get_data_item_as_file(ref):
return ref.load(value_type=FileValueType())
results = []
for obj_string in args.queries:
item_query = _parse_query(obj_string)
box = get_box(item_query.box)
item_query.box = box.box_id
results.append(box.storage.list_items(item_query))
if len(results[0]) == 1 and len(results[1]) == 1:
first_ref = DataRef.from_item(results[0][0])
second_ref = DataRef.from_item(results[1][0])
logger.info(
"Showing diff between items %s and %s",
first_ref.uri,
second_ref.uri,
)
first_file_path = _get_data_item_as_file(first_ref)
first_label = args.queries[0]
second_file_path = _get_data_item_as_file(second_ref)
second_label = args.queries[1]
command = [args.diff, str(first_file_path), str(second_file_path)]
if args.labels:
command.extend(
[
'--label',
first_label,
'--label',
second_label,
]
)
command.extend(args.diff_args)
logger.info("Calling diff %s", command)
subprocess.run(command, stdout=sys.stdout, stderr=sys.stderr, check=False)
else:
_print_error("Ambiguous values to diff.", args)
export_command(args)
🔗
Command that exports a data item to a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def export_command(args):
"""
Command that exports a data item to a file.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
def _export_item_as_file(ref, file_path):
return ref.load(value_type=FileValueType(file_path=file_path))
item_query = _parse_query(args.query)
box = get_box(item_query.box)
item_query.box = box.box_id
items = box.storage.list_items(item_query)
if len(items) == 0:
_print_error(f"No item found for {args.query}.", args)
elif len(items) > 1:
_print_error(f"Multiple items found for {args.query}.", args)
_print_result('', items, args)
else:
ref = DataRef.from_item(items[0])
export_file_path = pathlib.Path(args.file)
logger.info("Exporting item %s to file %s", ref.uri, export_file_path)
_export_item_as_file(ref, export_file_path)
_print_result(f"{args.query} successfully exported to {args.file}", [], args)
graph_command(args)
🔗
Command that creates a graph out of data items.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def graph_command(args):
"""
Command that creates a graph out of data items.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
item_query = _parse_query(args.query)
if item_query.box is None:
item_query.box = get_config().default_box
box = get_box(item_query.box)
items = box.storage.list_items(item_query)
refs = [DataRef.from_item(item) for item in items]
if args.file == '-':
writer = sys.stdout
else:
writer = io.FileIO(args.file, 'w')
writer = codecs.getwriter('utf-8')(writer)
with writer:
write_graph_of_refs(writer, refs)
info_command(args)
🔗
Command that shows the information about a data item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def info_command(args):
"""
Command that shows the information about a data item.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
item_query = _parse_query(args.query[0])
box = get_box(item_query.box)
item_query.box = box.box_id
items = box.storage.list_items(item_query)
if len(items) == 0:
_print_error(f"No item found by query {args.query[0]}", args)
return
if len(items) > 1:
_print_error(f"Multiple items found by query {args.query[0]}", args)
_print_result('', items, args)
return
item = items[0]
logger.info(
"Showing info about item %s from run %s in box %s",
item.data_id,
item.run_id,
item.box_id,
)
info = box.storage.create_reader(DataRef.from_item(item)).info
_print_result(f"Info {item.data_id} {item.run_id}", info, args)
list_command(args)
🔗
Function that lists the data items of a specific run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def list_command(args):
"""
Function that lists the data items of a specific run.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
item_query = _parse_query(args.query[0])
logger.info("Listing items by query %s", item_query)
box = get_box(item_query.box)
item_query.box = box.box_id
items = box.storage.list_items(item_query)
if len(items) == 0:
_print_error(f"No items found by query {args.query[0]}", args)
return
_print_result(f"List items {item_query}", items, args)
list_runs_command(args)
🔗
Function that lists runs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def list_runs_command(args):
"""
Function that lists runs.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
box = get_box()
storage = box.storage
logger.info("Listing all runs in box %s", box.box_id)
runs = storage.list_runs(box.box_id, name_filter=args.filter, limit=args.limit)
_print_result("List runs", runs, args)
main(argv=None)
🔗
main() method of our command line interface.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
argv |
List[str] |
Command line arguments given to the function. If |
None |
Source code in boxs/cli.py
def main(argv=None):
"""
main() method of our command line interface.
Args:
argv (List[str]): Command line arguments given to the function. If `None`, the
arguments are taken from `sys.argv`.
"""
argv = argv or sys.argv[1:]
boxs_home_dir = pathlib.Path.home() / '.boxs'
boxs_home_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(boxs_home_dir / 'cli.log')
file_handler.level = logging.DEBUG
file_handler.setFormatter(
logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
logging.basicConfig(
level=logging.DEBUG,
handlers=[file_handler],
)
logger.debug("Command line arguments: %s", argv)
parser = argparse.ArgumentParser(
prog='boxs',
description="Allows to inspect and manipulate boxes that are used for "
"storing data items using the python 'boxs' library.",
)
parser.set_defaults(command=lambda _: parser.print_help())
parser.add_argument(
'-b',
'--default-box',
metavar='BOX',
dest='default_box',
help="The id of the default box to use. If not set, the default is taken "
"from the BOXS_DEFAULT_BOX environment variable.",
)
parser.add_argument(
'-i',
'--init-module',
dest='init_module',
help="A python module that should be automatically loaded. If not set, the "
"default is taken from the BOXS_INIT_MODULE environment variable.",
)
parser.add_argument(
'-j',
'--json',
dest='json',
action='store_true',
help="Print output as json",
)
subparsers = parser.add_subparsers(help="Commands")
_add_list_runs_command(subparsers)
_add_name_run_command(subparsers)
_add_delete_run_command(subparsers)
_add_clean_runs_command(subparsers)
_add_list_command(subparsers)
_add_info_command(subparsers)
_add_diff_command(subparsers)
_add_export_command(subparsers)
_add_graph_command(subparsers)
args = parser.parse_args(argv)
config = get_config()
if args.default_box:
config.default_box = args.default_box
if args.init_module:
config.init_module = args.init_module
try:
args.command(args)
except BoxsError as error:
_print_error(error, args)
name_run_command(args)
🔗
Command that allows to set a name for a specific run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The parsed arguments from command line. |
required |
Source code in boxs/cli.py
def name_run_command(args):
"""
Command that allows to set a name for a specific run.
Args:
args (argparse.Namespace): The parsed arguments from command line.
"""
box = get_box()
storage = box.storage
run = _get_run_from_args(args)
if run is None:
return
logger.info(
"Setting name of run %s in box %s to %s",
run.run_id,
box.box_id,
args.name,
)
run = storage.set_run_name(box.box_id, run.run_id, args.name)
_print_result(f"Run name set {run.run_id}", [run], args)
config
🔗
Configuration for Boxs
Configuration
🔗
Class that contains the individual config values.
Attributes:
Name | Type | Description |
---|---|---|
default_box |
str |
The id of a box that should be used, no other box id is
specified. Will be initialized from the |
init_module |
str |
The name of a python module, that should be automatically
loaded at initialization time. Ideally, the loading of this module should
trigger the definition of all boxes that are used, so that they can be
found if needed. Setting this to a new module name will lead to an import
of the module. Will be initialized from the |
Source code in boxs/config.py
class Configuration:
"""
Class that contains the individual config values.
Attributes:
default_box (str): The id of a box that should be used, no other box id is
specified. Will be initialized from the `BOXS_DEFAULT_BOX` environment
variable if defined, otherwise is initialized to `None`.
init_module (str): The name of a python module, that should be automatically
loaded at initialization time. Ideally, the loading of this module should
trigger the definition of all boxes that are used, so that they can be
found if needed. Setting this to a new module name will lead to an import
of the module. Will be initialized from the `BOXS_INIT_MODULE` environment
variable if defined, otherwise is initialized to `None`.
"""
def __init__(self):
self._initialized = False
self.default_box = os.environ.get('BOXS_DEFAULT_BOX', None)
logger.info("Setting default_box to %s", self.default_box)
self.init_module = os.environ.get('BOXS_INIT_MODULE', None)
logger.info("Setting init_module to %s", self.init_module)
@property
def default_box(self):
"""
Returns the id of the default box.
Returns:
str: The id of the id of the default box.
"""
return self._default_box
@default_box.setter
def default_box(self, default_box):
"""
Set the id of the default box.
Args:
default_box (str): The ix of the box that should be used if no box is
specified.
"""
self._default_box = default_box
@property
def init_module(self):
"""
Returns the name of the init_module that is used in this configuration.
Returns:
str: The name of the init_module that is used.
"""
return self._init_module
@init_module.setter
def init_module(self, init_module):
"""
Set the name of the init_module.
Setting this value might lead to the module being imported, if boxs is
properly initialized.
Args:
init_module (str): The name of the module to use for initialization.
"""
self._init_module = init_module
self._load_init_module()
@property
def initialized(self):
"""
Returns if boxs is completely initialized.
Returns:
bool: `True` if the boxs library is initialized, otherwise `False`.
"""
return self._initialized
@initialized.setter
def initialized(self, initialized):
"""
Set the initialization status of boxs.
Setting this value to `True` might lead to the init_module being imported, if
`init_module` is set.
Args:
initialized (bool): If the library is fully initialized.
"""
if self._initialized and not initialized:
self._initialized = False
if not self._initialized and initialized:
self._initialized = True
self._load_init_module()
def _load_init_module(self):
if self.init_module is not None and self.initialized:
logger.info("Import init_module %s", self.init_module)
try:
importlib.import_module(self.init_module)
except ImportError as import_error:
self.initialized = False
raise import_error
default_box
property
writable
🔗
Returns the id of the default box.
Returns:
Type | Description |
---|---|
str |
The id of the id of the default box. |
init_module
property
writable
🔗
Returns the name of the init_module that is used in this configuration.
Returns:
Type | Description |
---|---|
str |
The name of the init_module that is used. |
initialized
property
writable
🔗
Returns if boxs is completely initialized.
Returns:
Type | Description |
---|---|
bool |
|
get_config()
🔗
Returns the configuration.
Returns:
Type | Description |
---|---|
boxs.config.Configuration |
The configuration. |
Source code in boxs/config.py
def get_config():
"""
Returns the configuration.
Returns:
boxs.config.Configuration: The configuration.
"""
global _CONFIG # pylint: disable=global-statement
if _CONFIG is None:
logger.info("Create new configuration")
_CONFIG = Configuration()
_CONFIG.initialized = True
return _CONFIG
data
🔗
Classes representing data items and references
DataInfo
🔗
Class representing a stored data item.
Attributes:
Name | Type | Description |
---|---|---|
ref |
boxs.data.DataRef |
Reference to this item. |
origin |
str |
The origin of the data. |
parents |
Tuple[boxs.data.DataItem] |
A tuple containing other data items from which this item was derived. |
name |
Optional[str] |
A string that can be used to refer to this item by an
user. Defaults to |
tags |
Dict[str,str] |
A dictionary containing string keys and values, that can be used for grouping multiple items together. Defaults to an empty dict. |
meta |
Dict[str,Any] |
A dictionary containing meta-data. This meta-data can have arbitrary values as long as they can be serialized to JSON. Defaults to an empty dict. |
Source code in boxs/data.py
class DataInfo:
"""
Class representing a stored data item.
Attributes:
ref (boxs.data.DataRef): Reference to this item.
origin (str): The origin of the data.
parents (Tuple[boxs.data.DataItem]): A tuple containing other data items
from which this item was derived.
name (Optional[str]): A string that can be used to refer to this item by an
user. Defaults to `None`.
tags (Dict[str,str]): A dictionary containing string keys and values, that can
be used for grouping multiple items together. Defaults to an empty dict.
meta (Dict[str,Any]): A dictionary containing meta-data. This meta-data can
have arbitrary values as long as they can be serialized to JSON. Defaults
to an empty dict.
"""
__slots__ = [
'ref',
'origin',
'name',
'parents',
'tags',
'meta',
]
def __init__(
self,
ref,
origin,
parents=tuple(),
name=None,
tags=None,
meta=None,
): # pylint: disable=too-many-arguments
self.ref = ref
self.origin = origin
self.parents = parents
self.name = name
self.tags = tags or {}
self.meta = meta or {}
@property
def data_id(self):
"""Returns the data_id."""
return self.ref.data_id
@property
def box_id(self):
"""Returns the box_id."""
return self.ref.box_id
@property
def run_id(self):
"""Returns the run_id."""
return self.ref.run_id
@property
def uri(self):
"""Returns the uri."""
return self.ref.uri
@property
def info(self):
"""Returns the info. This is to be compatible with DataRef"""
return self
def load(self, value_type=None):
"""
Load the content of the data item.
Args:
value_type (boxs.value_types.ValueType): The value type to use when
loading the data. Defaults to `None`, in which case the same value
type will be used that was used when the data was initially stored.
Returns:
Any: The loaded data.
Raises:
boxs.errors.BoxNotDefined: If the data is stored in an unknown box.
boxs.errors.DataNotFound: If no data with the specific ids are stored
in the referenced box.
"""
return load(self, value_type=value_type)
def value_info(self):
"""
Returns information about this data item.
Returns:
Dict[str,str]: A dict containing information about this reference.
"""
value_info = {
'ref': self.ref.value_info(),
'origin': self.origin,
'name': self.name,
'tags': self.tags,
'parents': [parent.value_info() for parent in self.parents],
'meta': self.meta,
}
return value_info
@classmethod
def from_value_info(cls, value_info):
"""
Recreate a DataInfo from its value_info.
Args:
value_info (Dict[str,str]): A dictionary containing the info.
Returns:
boxs.data.DataInfo: The information about the data item.
Raises:
KeyError: If necessary attributes are missing from the `value_info`.
"""
if 'ref' not in value_info:
return DataRef.from_value_info(value_info)
data_ref = DataRef.from_value_info(value_info['ref'])
origin = value_info['origin']
name = value_info['name']
tags = value_info['tags']
meta = value_info['meta']
parents = tuple(
DataInfo.from_value_info(parent_info)
for parent_info in value_info['parents']
)
return DataInfo(
data_ref,
origin,
parents,
name=name,
tags=tags,
meta=meta,
)
def __str__(self):
return self.uri
box_id
property
readonly
🔗
Returns the box_id.
data_id
property
readonly
🔗
Returns the data_id.
info
property
readonly
🔗
Returns the info. This is to be compatible with DataRef
run_id
property
readonly
🔗
Returns the run_id.
uri
property
readonly
🔗
Returns the uri.
from_value_info(value_info)
classmethod
🔗
Recreate a DataInfo from its value_info.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value_info |
Dict[str,str] |
A dictionary containing the info. |
required |
Returns:
Type | Description |
---|---|
boxs.data.DataInfo |
The information about the data item. |
Exceptions:
Type | Description |
---|---|
KeyError |
If necessary attributes are missing from the |
Source code in boxs/data.py
@classmethod
def from_value_info(cls, value_info):
"""
Recreate a DataInfo from its value_info.
Args:
value_info (Dict[str,str]): A dictionary containing the info.
Returns:
boxs.data.DataInfo: The information about the data item.
Raises:
KeyError: If necessary attributes are missing from the `value_info`.
"""
if 'ref' not in value_info:
return DataRef.from_value_info(value_info)
data_ref = DataRef.from_value_info(value_info['ref'])
origin = value_info['origin']
name = value_info['name']
tags = value_info['tags']
meta = value_info['meta']
parents = tuple(
DataInfo.from_value_info(parent_info)
for parent_info in value_info['parents']
)
return DataInfo(
data_ref,
origin,
parents,
name=name,
tags=tags,
meta=meta,
)
load(self, value_type=None)
🔗
Load the content of the data item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value_type |
boxs.value_types.ValueType |
The value type to use when
loading the data. Defaults to |
None |
Returns:
Type | Description |
---|---|
Any |
The loaded data. |
Exceptions:
Type | Description |
---|---|
boxs.errors.BoxNotDefined |
If the data is stored in an unknown box. |
boxs.errors.DataNotFound |
If no data with the specific ids are stored in the referenced box. |
Source code in boxs/data.py
def load(self, value_type=None):
"""
Load the content of the data item.
Args:
value_type (boxs.value_types.ValueType): The value type to use when
loading the data. Defaults to `None`, in which case the same value
type will be used that was used when the data was initially stored.
Returns:
Any: The loaded data.
Raises:
boxs.errors.BoxNotDefined: If the data is stored in an unknown box.
boxs.errors.DataNotFound: If no data with the specific ids are stored
in the referenced box.
"""
return load(self, value_type=value_type)
value_info(self)
🔗
Returns information about this data item.
Returns:
Type | Description |
---|---|
Dict[str,str] |
A dict containing information about this reference. |
Source code in boxs/data.py
def value_info(self):
"""
Returns information about this data item.
Returns:
Dict[str,str]: A dict containing information about this reference.
"""
value_info = {
'ref': self.ref.value_info(),
'origin': self.origin,
'name': self.name,
'tags': self.tags,
'parents': [parent.value_info() for parent in self.parents],
'meta': self.meta,
}
return value_info
DataRef
🔗
Reference to a DataInfo.
Source code in boxs/data.py
class DataRef:
"""
Reference to a DataInfo.
"""
__slots__ = [
'box_id',
'data_id',
'run_id',
'_info',
]
def __init__(self, box_id, data_id, run_id):
self.box_id = box_id
self.data_id = data_id
self.run_id = run_id
self._info = None
def value_info(self):
"""
Returns information about this reference.
Returns:
Dict[str,str]: A dict containing information about this reference.
"""
value_info = {
'box_id': self.box_id,
'data_id': self.data_id,
'run_id': self.run_id,
}
return value_info
@classmethod
def from_value_info(cls, value_info):
"""
Recreate a DataRef from its value_info.
Args:
value_info (Dict[str,str]): A dictionary containing the ids.
Returns:
boxs.data.DataRef: The DataRef referencing the data.
Raises:
KeyError: If necessary attributes are missing from the `value_info`.
"""
box_id = value_info['box_id']
data_id = value_info['data_id']
run_id = value_info['run_id']
data = DataRef(box_id, data_id, run_id)
return data
@property
def uri(self):
"""Return the URI of the data item referenced."""
return f'boxs://{self.box_id}/{self.data_id}/{self.run_id}'
@classmethod
def from_uri(cls, uri):
"""
Recreate a DataRef from a URI.
Args:
uri (str): URI in the format 'box://<box-id>/<data-id>/<run-id>'.
Returns:
DataRef: The DataRef referencing the data.
Raises:
ValueError: If the URI doesn't follow the expected format.
"""
url_parts = urllib.parse.urlparse(uri)
if url_parts.scheme != 'boxs':
raise ValueError("Invalid scheme")
box_id = url_parts.hostname
data_id, run_id = url_parts.path[1:].split('/', 1)
data = DataRef(box_id, data_id, run_id)
return data
@classmethod
def from_item(cls, item):
"""
Recreate a DataRef from an Item.
Args:
item (boxs.storage.Item): The item which describes the data we want to
refer to.
Returns:
DataRef: The DataRef referencing the data.
"""
return DataRef(item.box_id, item.data_id, item.run_id)
@property
def info(self):
"""
Returns the info object describing the referenced data item.
Returns:
boxs.data.DataInfo: The info about the data item referenced.
"""
if self._info is None:
self._info = info(self)
return self._info
def load(self, value_type=None):
"""
Load the content of the data item.
Args:
value_type (boxs.value_types.ValueType): The value type to use when
loading the data. Defaults to `None`, in which case the same value
type will be used that was used when the data was initially stored.
Returns:
Any: The loaded data.
Raises:
boxs.errors.BoxNotDefined: If the data is stored in an unknown box.
boxs.errors.DataNotFound: If no data with the specific ids are stored
in the referenced box.
"""
return self.info.load(value_type=value_type)
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return (
self.box_id == other.box_id
and self.data_id == other.data_id
and self.run_id == other.run_id
)
def __hash__(self):
return hash((self.box_id, self.data_id, self.run_id))
def __str__(self):
return self.uri
info
property
readonly
🔗
Returns the info object describing the referenced data item.
Returns:
Type | Description |
---|---|
boxs.data.DataInfo |
The info about the data item referenced. |
uri
property
readonly
🔗
Return the URI of the data item referenced.
from_item(item)
classmethod
🔗
Recreate a DataRef from an Item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
boxs.storage.Item |
The item which describes the data we want to refer to. |
required |
Returns:
Type | Description |
---|---|
DataRef |
The DataRef referencing the data. |
Source code in boxs/data.py
@classmethod
def from_item(cls, item):
"""
Recreate a DataRef from an Item.
Args:
item (boxs.storage.Item): The item which describes the data we want to
refer to.
Returns:
DataRef: The DataRef referencing the data.
"""
return DataRef(item.box_id, item.data_id, item.run_id)
from_uri(uri)
classmethod
🔗
Recreate a DataRef from a URI.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
str |
URI in the format 'box:// |
required |
Returns:
Type | Description |
---|---|
DataRef |
The DataRef referencing the data. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the URI doesn't follow the expected format. |
Source code in boxs/data.py
@classmethod
def from_uri(cls, uri):
"""
Recreate a DataRef from a URI.
Args:
uri (str): URI in the format 'box://<box-id>/<data-id>/<run-id>'.
Returns:
DataRef: The DataRef referencing the data.
Raises:
ValueError: If the URI doesn't follow the expected format.
"""
url_parts = urllib.parse.urlparse(uri)
if url_parts.scheme != 'boxs':
raise ValueError("Invalid scheme")
box_id = url_parts.hostname
data_id, run_id = url_parts.path[1:].split('/', 1)
data = DataRef(box_id, data_id, run_id)
return data
from_value_info(value_info)
classmethod
🔗
Recreate a DataRef from its value_info.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value_info |
Dict[str,str] |
A dictionary containing the ids. |
required |
Returns:
Type | Description |
---|---|
boxs.data.DataRef |
The DataRef referencing the data. |
Exceptions:
Type | Description |
---|---|
KeyError |
If necessary attributes are missing from the |
Source code in boxs/data.py
@classmethod
def from_value_info(cls, value_info):
"""
Recreate a DataRef from its value_info.
Args:
value_info (Dict[str,str]): A dictionary containing the ids.
Returns:
boxs.data.DataRef: The DataRef referencing the data.
Raises:
KeyError: If necessary attributes are missing from the `value_info`.
"""
box_id = value_info['box_id']
data_id = value_info['data_id']
run_id = value_info['run_id']
data = DataRef(box_id, data_id, run_id)
return data
load(self, value_type=None)
🔗
Load the content of the data item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value_type |
boxs.value_types.ValueType |
The value type to use when
loading the data. Defaults to |
None |
Returns:
Type | Description |
---|---|
Any |
The loaded data. |
Exceptions:
Type | Description |
---|---|
boxs.errors.BoxNotDefined |
If the data is stored in an unknown box. |
boxs.errors.DataNotFound |
If no data with the specific ids are stored in the referenced box. |
Source code in boxs/data.py
def load(self, value_type=None):
"""
Load the content of the data item.
Args:
value_type (boxs.value_types.ValueType): The value type to use when
loading the data. Defaults to `None`, in which case the same value
type will be used that was used when the data was initially stored.
Returns:
Any: The loaded data.
Raises:
boxs.errors.BoxNotDefined: If the data is stored in an unknown box.
boxs.errors.DataNotFound: If no data with the specific ids are stored
in the referenced box.
"""
return self.info.load(value_type=value_type)
value_info(self)
🔗
Returns information about this reference.
Returns:
Type | Description |
---|---|
Dict[str,str] |
A dict containing information about this reference. |
Source code in boxs/data.py
def value_info(self):
"""
Returns information about this reference.
Returns:
Dict[str,str]: A dict containing information about this reference.
"""
value_info = {
'box_id': self.box_id,
'data_id': self.data_id,
'run_id': self.run_id,
}
return value_info
errors
🔗
Errors in boxs
BoxAlreadyDefined (BoxError)
🔗
Error that is raised if multiple boxes are defined using the same box id.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id of the box. |
Source code in boxs/errors.py
class BoxAlreadyDefined(BoxError):
"""
Error that is raised if multiple boxes are defined using the same box id.
Attributes:
box_id (str): The id of the box.
"""
def __init__(self, box_id):
self.box_id = box_id
super().__init__(f"Box with box id {self.box_id} already defined")
BoxError (BoxsError)
🔗
Base class for all errors related to boxes
Source code in boxs/errors.py
class BoxError(BoxsError):
"""Base class for all errors related to boxes"""
BoxNotDefined (BoxError)
🔗
Error that is raised if a box id refers to a non-defined box.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id of the box. |
Source code in boxs/errors.py
class BoxNotDefined(BoxError):
"""
Error that is raised if a box id refers to a non-defined box.
Attributes:
box_id (str): The id of the box.
"""
def __init__(self, box_id):
self.box_id = box_id
super().__init__(f"Box with box id {self.box_id} not defined")
BoxNotFound (BoxError)
🔗
Error that is raised if a box can't be found.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id of the box which should contain the data item. |
Source code in boxs/errors.py
class BoxNotFound(BoxError):
"""
Error that is raised if a box can't be found.
Attributes:
box_id (str): The id of the box which should contain the data item.
"""
def __init__(self, box_id):
self.box_id = box_id
super().__init__(f"Box {self.box_id} does not exist in storage.")
BoxsError (Exception)
🔗
Base class for all boxs specific errors
Source code in boxs/errors.py
class BoxsError(Exception):
"""Base class for all boxs specific errors"""
DataCollision (DataError)
🔗
Error that is raised if a newly created data item already exists.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id of the box containing the data item. |
data_id |
str |
The id of the data item. |
run_id |
str |
The id of the run when the data was created. |
Source code in boxs/errors.py
class DataCollision(DataError):
"""
Error that is raised if a newly created data item already exists.
Attributes:
box_id (str): The id of the box containing the data item.
data_id (str): The id of the data item.
run_id (str): The id of the run when the data was created.
"""
def __init__(self, box_id, data_id, run_id):
self.box_id = box_id
self.data_id = data_id
self.run_id = run_id
super().__init__(
f"Data {self.data_id} from run {self.run_id} "
f"already exists in box {self.box_id}"
)
DataError (BoxsError)
🔗
Base class for all boxs specific errors related to data
Source code in boxs/errors.py
class DataError(BoxsError):
"""Base class for all boxs specific errors related to data"""
DataNotFound (DataError)
🔗
Error that is raised if a data item can't be found.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id of the box which should contain the data item. |
data_id |
str |
The id of the data item. |
run_id |
str |
The id of the run when the data was created. |
Source code in boxs/errors.py
class DataNotFound(DataError):
"""
Error that is raised if a data item can't be found.
Attributes:
box_id (str): The id of the box which should contain the data item.
data_id (str): The id of the data item.
run_id (str): The id of the run when the data was created.
"""
def __init__(self, box_id, data_id, run_id):
self.box_id = box_id
self.data_id = data_id
self.run_id = run_id
super().__init__(
f"Data {self.data_id} from run {self.run_id} "
f"does not exist in box {self.box_id}"
)
MissingValueType (ValueTypeError)
🔗
Error that is raised if no ValueType can be found that supports the value.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id of the box. |
Source code in boxs/errors.py
class MissingValueType(ValueTypeError):
"""
Error that is raised if no ValueType can be found that supports the value.
Attributes:
box_id (str): The id of the box.
"""
def __init__(self, value):
self.value = value
super().__init__(f"No value type found for '{self.value}'.")
NameCollision (DataError)
🔗
Error that is raised if a data item with the same name already exists.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id of the box containing the data item. |
data_id |
str |
The id of the data item. |
run_id |
str |
The id of the run when the data was created. |
name |
str |
The name of the data item that is used twice. |
Source code in boxs/errors.py
class NameCollision(DataError):
"""
Error that is raised if a data item with the same name already exists.
Attributes:
box_id (str): The id of the box containing the data item.
data_id (str): The id of the data item.
run_id (str): The id of the run when the data was created.
name (str): The name of the data item that is used twice.
"""
def __init__(self, box_id, data_id, run_id, name):
self.box_id = box_id
self.data_id = data_id
self.run_id = run_id
self.name = name
super().__init__(
f"There already exists a data item in run {self.run_id} with the "
f"name {self.name} in box {self.box_id}"
)
RunError (BoxsError)
🔗
Base class for all run specific errors
Source code in boxs/errors.py
class RunError(BoxsError):
"""Base class for all run specific errors"""
RunNotFound (RunError)
🔗
Error that is raised if a run can't be found.
Attributes:
Name | Type | Description |
---|---|---|
box_id |
str |
The id of the box which should contain the run. |
run_id |
str |
The id of the run. |
Source code in boxs/errors.py
class RunNotFound(RunError):
"""
Error that is raised if a run can't be found.
Attributes:
box_id (str): The id of the box which should contain the run.
run_id (str): The id of the run.
"""
def __init__(self, box_id, run_id):
self.box_id = box_id
self.run_id = run_id
super().__init__(f"Run {self.run_id} does not exist in box {self.box_id}")
filesystem
🔗
Store data in a local filesystem
FileSystemStorage (Storage)
🔗
Storage implementation that stores data items and meta-data in a directory.
Source code in boxs/filesystem.py
class FileSystemStorage(Storage):
"""Storage implementation that stores data items and meta-data in a directory."""
def __init__(self, directory):
"""
Create the storage.
Args:
directory (Union[str,pathlib.Path]): The path to the directory where the
data will be stored.
"""
self.root_directory = pathlib.Path(directory)
def _data_file_paths(self, item):
base_path = (
self.root_directory / item.box_id / 'data' / item.data_id / item.run_id
)
return base_path.with_suffix('.data'), base_path.with_suffix('.info')
def _run_file_path(self, item):
return self._runs_directory_path(item.box_id) / item.run_id / item.data_id
def _runs_directory_path(self, box_id):
path = self.root_directory / box_id / 'runs'
path.mkdir(parents=True, exist_ok=True)
return path
def _runs_names_directory_path(self, box_id):
path = self._runs_directory_path(box_id) / '_named'
path.mkdir(parents=True, exist_ok=True)
return path
def _run_directory_path(self, box_id, run_id):
return self._runs_directory_path(box_id) / run_id
def _box_directory_path(self, box_id):
return self.root_directory / box_id
def list_runs(self, box_id, limit=None, name_filter=None):
box_directory = self._box_directory_path(box_id)
logger.debug("List runs from directory %s", box_directory)
if not box_directory.exists():
raise BoxNotFound(box_id)
runs = self._list_runs_in_box(box_id)
runs = sorted(runs, key=lambda x: x.time, reverse=True)
if name_filter is not None:
runs = list(filter(lambda x: (x.name or '').startswith(name_filter), runs))
if limit is not None:
runs = runs[:limit]
return runs
def _list_runs_in_box(self, box_id):
runs_directory = self._runs_directory_path(box_id)
runs = [
self._create_run_from_run_path(box_id, path)
for path in runs_directory.iterdir()
if path.is_dir() and path != self._runs_names_directory_path(box_id)
]
return runs
def list_items(self, item_query):
box_id = item_query.box
box_directory = self._box_directory_path(box_id)
if not box_directory.exists():
raise BoxNotFound(box_id)
logger.debug("List items with query %s", item_query)
runs = self._list_runs_in_box(box_id)
if item_query.run:
runs = [
run
for run in runs
if run.run_id.startswith(item_query.run or '')
or (run.name or '').startswith(item_query.run or '')
]
runs = sorted(runs, key=lambda x: x.time)
all_items = []
for run in runs:
items = self._get_items_in_run(box_id, run.run_id)
items = sorted(items, key=lambda x: x.time)
all_items.extend(
(
item
for item in items
if item.data_id.startswith(item_query.data or '')
or (item.name or '').startswith(item_query.data or '')
)
)
return all_items
def set_run_name(self, box_id, run_id, name):
logger.debug("Set name of run %s in box %s to %s", run_id, box_id, name)
box_directory = self._box_directory_path(box_id)
if not box_directory.exists():
raise BoxNotFound(box_id)
run_directory = self._run_directory_path(box_id, run_id)
if not run_directory.exists():
raise RunNotFound(box_id, run_id)
run_path = self._run_directory_path(box_id, run_id)
self._remove_name_for_run(box_id, run_id)
if name is not None:
self._set_name_for_run_path(box_id, name, run_path)
run = self._create_run_from_run_path(box_id, run_path)
return run
def delete_run(self, box_id, run_id):
run_directory = self._run_directory_path(box_id, run_id)
if not run_directory.exists():
raise RunNotFound(box_id, run_id)
items = self._get_items_in_run(box_id, run_id)
for item in items:
data_file, info_file = self._data_file_paths(item)
data_file.unlink()
info_file.unlink()
shutil.rmtree(run_directory)
def create_writer(self, item, name=None, tags=None):
logger.debug("Create writer for %s", item)
tags = tags or {}
data_file, info_file = self._data_file_paths(item)
run_file = self._run_file_path(item)
return _FileSystemWriter(item, name, tags, data_file, info_file, run_file)
def create_reader(self, item):
logger.debug("Create reader for %s", item)
data_file, info_file = self._data_file_paths(item)
return _FileSystemReader(item, data_file, info_file)
def _get_run_names(self, box_id):
name_directory = self._runs_names_directory_path(box_id)
run_names = {}
for named_link_file in name_directory.iterdir():
name = named_link_file.name
resolved_run_dir = named_link_file.resolve()
run_id = resolved_run_dir.name
run_names[run_id] = name
return run_names
def _set_name_for_run_path(self, box_id, name, run_path):
name_dir = self._runs_names_directory_path(box_id)
name_dir.mkdir(exist_ok=True)
name_symlink_file = name_dir / name
symlink_path = os.path.relpath(run_path, name_dir)
name_symlink_file.symlink_to(symlink_path)
def _remove_name_for_run(self, box_id, run_id):
run_names = self._get_run_names(box_id)
if run_id in run_names:
name_dir = self._runs_names_directory_path(box_id)
name_symlink_file = name_dir / run_names[run_id]
name_symlink_file.unlink()
def _get_items_in_run(self, box_id, run_id):
named_items = self._get_item_names_in_run(box_id, run_id)
items = [
Item(
box_id,
path.name,
run_id,
named_items.get(path.name, ''),
datetime.datetime.fromtimestamp(
path.stat().st_mtime,
tz=datetime.timezone.utc,
),
)
for path in self._run_directory_path(box_id, run_id).iterdir()
if path.is_file()
]
return items
def _get_item_names_in_run(self, box_id, run_id):
name_directory = self._run_directory_path(box_id, run_id) / '_named'
named_items = {}
if name_directory.exists():
for named_link_file in name_directory.iterdir():
name = named_link_file.name
resolved_info_file = named_link_file.resolve()
data_id = resolved_info_file.name
named_items[data_id] = name
return named_items
def _create_run_from_run_path(self, box_id, run_path):
run_names = self._get_run_names(box_id)
run_id = run_path.name
return Run(
box_id,
run_id,
run_names.get(run_id),
datetime.datetime.fromtimestamp(
run_path.stat().st_mtime,
tz=datetime.timezone.utc,
),
)
__init__(self, directory)
special
🔗
Create the storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
Union[str,pathlib.Path] |
The path to the directory where the data will be stored. |
required |
Source code in boxs/filesystem.py
def __init__(self, directory):
"""
Create the storage.
Args:
directory (Union[str,pathlib.Path]): The path to the directory where the
data will be stored.
"""
self.root_directory = pathlib.Path(directory)
create_reader(self, item)
🔗
Creates a Reader
instance, that allows to load existing data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
boxs.storage.Item |
The item that should be read. |
required |
Returns:
Type | Description |
---|---|
boxs.storage.Reader |
The reader that will load the data from the storage. |
Source code in boxs/filesystem.py
def create_reader(self, item):
logger.debug("Create reader for %s", item)
data_file, info_file = self._data_file_paths(item)
return _FileSystemReader(item, data_file, info_file)
create_writer(self, item, name=None, tags=None)
🔗
Creates a Writer
instance, that allows to store new data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
boxs.storage.Item |
The new data item. |
required |
name |
str |
An optional name, that can be used for referring to this item
within the run. Defaults to |
None |
tags |
Dict[str,str] |
A dictionary containing tags that can be used for grouping multiple items together. Defaults to an empty dictionary. |
None |
Returns:
Type | Description |
---|---|
boxs.storage.Writer |
The writer that will write the data into the storage. |
Source code in boxs/filesystem.py
def create_writer(self, item, name=None, tags=None):
logger.debug("Create writer for %s", item)
tags = tags or {}
data_file, info_file = self._data_file_paths(item)
run_file = self._run_file_path(item)
return _FileSystemWriter(item, name, tags, data_file, info_file, run_file)
delete_run(self, box_id, run_id)
🔗
Delete all the data of the specified run.
Args;
box_id (str): box_id
of the box in which the run is stored.
run_id (str): Run id of the run which should be deleted.
Source code in boxs/filesystem.py
def delete_run(self, box_id, run_id):
run_directory = self._run_directory_path(box_id, run_id)
if not run_directory.exists():
raise RunNotFound(box_id, run_id)
items = self._get_items_in_run(box_id, run_id)
for item in items:
data_file, info_file = self._data_file_paths(item)
data_file.unlink()
info_file.unlink()
shutil.rmtree(run_directory)
list_items(self, item_query)
🔗
List all items that match a given query.
The item query can contain parts of box id, run id or run name and data id or
data name. If a query value is not set (== None
) it is not used as a filter
criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item_query |
boxs.storage.ItemQuery |
The query which defines which items should be listed. |
required |
Returns:
Type | Description |
---|---|
List[box.storage.Item] |
The runs. |
Source code in boxs/filesystem.py
def list_items(self, item_query):
box_id = item_query.box
box_directory = self._box_directory_path(box_id)
if not box_directory.exists():
raise BoxNotFound(box_id)
logger.debug("List items with query %s", item_query)
runs = self._list_runs_in_box(box_id)
if item_query.run:
runs = [
run
for run in runs
if run.run_id.startswith(item_query.run or '')
or (run.name or '').startswith(item_query.run or '')
]
runs = sorted(runs, key=lambda x: x.time)
all_items = []
for run in runs:
items = self._get_items_in_run(box_id, run.run_id)
items = sorted(items, key=lambda x: x.time)
all_items.extend(
(
item
for item in items
if item.data_id.startswith(item_query.data or '')
or (item.name or '').startswith(item_query.data or '')
)
)
return all_items
list_runs(self, box_id, limit=None, name_filter=None)
🔗
List the runs within a box stored in this storage.
The runs should be returned in descending order of their start time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
box_id |
str |
|
required |
limit |
Optional[int] |
Limits the returned runs to maximum |
None |
name_filter |
Optional[str] |
If set, only include runs which have names
that have the filter as prefix. Defaults to |
None |
Returns:
Type | Description |
---|---|
List[box.storage.Run] |
The runs. |
Source code in boxs/filesystem.py
def list_runs(self, box_id, limit=None, name_filter=None):
box_directory = self._box_directory_path(box_id)
logger.debug("List runs from directory %s", box_directory)
if not box_directory.exists():
raise BoxNotFound(box_id)
runs = self._list_runs_in_box(box_id)
runs = sorted(runs, key=lambda x: x.time, reverse=True)
if name_filter is not None:
runs = list(filter(lambda x: (x.name or '').startswith(name_filter), runs))
if limit is not None:
runs = runs[:limit]
return runs
set_run_name(self, box_id, run_id, name)
🔗
Set the name of a run.
The name can be updated and removed by providing None
.
Args;
box_id (str): box_id
of the box in which the run is stored.
run_id (str): Run id of the run which should be named.
name (Optional[str]): New name of the run. If None
, an existing name
will be removed.
Returns:
Type | Description |
---|---|
box.storage.Run |
The run with its new name. |
Source code in boxs/filesystem.py
def set_run_name(self, box_id, run_id, name):
logger.debug("Set name of run %s in box %s to %s", run_id, box_id, name)
box_directory = self._box_directory_path(box_id)
if not box_directory.exists():
raise BoxNotFound(box_id)
run_directory = self._run_directory_path(box_id, run_id)
if not run_directory.exists():
raise RunNotFound(box_id, run_id)
run_path = self._run_directory_path(box_id, run_id)
self._remove_name_for_run(box_id, run_id)
if name is not None:
self._set_name_for_run_path(box_id, name, run_path)
run = self._create_run_from_run_path(box_id, run_path)
return run
graph
🔗
Functions for creating dependency graphs
write_graph_of_refs(writer, refs)
🔗
Write the dependency graph in DOT format for the given refs to the writer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
writer |
io.TextIO |
A text stream, to which the graph definition will be written. |
required |
refs |
list[boxs.data.DataRef] |
A list of DataRef instances for which the dependency graph will be created. |
required |
Source code in boxs/graph.py
def write_graph_of_refs(writer, refs):
"""
Write the dependency graph in DOT format for the given refs to the writer.
Args:
writer (io.TextIO): A text stream, to which the graph definition will be
written.
refs (list[boxs.data.DataRef]): A list of DataRef instances for which the
dependency graph will be created.
"""
writer.write("digraph {\n")
infos_by_run = collections.defaultdict(list)
visited = set()
queue = collections.deque()
queue.extend(refs)
while queue:
ref = queue.popleft()
if ref.uri in visited:
continue
info = ref.info
infos_by_run[ref.run_id].append(info)
for parent in info.parents:
queue.appendleft(parent)
visited.add(ref.uri)
for run_id, infos in infos_by_run.items():
writer.write(f' subgraph "cluster_{run_id}" {{\n')
writer.write(f' label="Run {run_id}";\n')
_write_nodes_for_infos(infos, writer)
writer.write(" }\n")
for run_id, infos in infos_by_run.items():
_write_edges_to_parents_for_infos(infos, writer)
writer.write("}\n")
io
🔗
Functions for I/O of data
DelegatingStream (RawIOBase)
🔗
Stream that delegates to another stream.
Source code in boxs/io.py
class DelegatingStream(io.RawIOBase):
"""Stream that delegates to another stream."""
def __init__(self, delegate):
"""
Creates a new DelegatingStream.
Args:
delegate (io.RawIOBase): The delegate stream.
"""
self.delegate = delegate
super().__init__()
def close(self):
self.delegate.close()
@property
def closed(self):
"""Property that returns if a stream is closed."""
return self.delegate.closed
def flush(self):
self.delegate.flush()
def seek(self, offset, whence=io.SEEK_SET):
return self.delegate.seek(offset, whence)
def seekable(self):
return self.delegate.seekable()
def tell(self):
return self.delegate.tell()
def truncate(self, size=None):
return self.delegate.truncate(size)
def writable(self):
return self.delegate.writable()
def readinto(self, byte_buffer):
return self.delegate.readinto(byte_buffer)
def write(self, byte_buffer):
return self.delegate.write(byte_buffer)
closed
property
readonly
🔗
Property that returns if a stream is closed.
__init__(self, delegate)
special
🔗
Creates a new DelegatingStream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
delegate |
io.RawIOBase |
The delegate stream. |
required |
Source code in boxs/io.py
def __init__(self, delegate):
"""
Creates a new DelegatingStream.
Args:
delegate (io.RawIOBase): The delegate stream.
"""
self.delegate = delegate
super().__init__()
close(self)
🔗
Flush and close the IO object.
This method has no effect if the file is already closed.
Source code in boxs/io.py
def close(self):
self.delegate.close()
flush(self)
🔗
Flush write buffers, if applicable.
This is not implemented for read-only and non-blocking streams.
Source code in boxs/io.py
def flush(self):
self.delegate.flush()
seek(self, offset, whence=0)
🔗
Change stream position.
Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by whence. Values for whence are:
- 0 -- start of stream (the default); offset should be zero or positive
- 1 -- current stream position; offset may be negative
- 2 -- end of stream; offset is usually negative
Return the new absolute position.
Source code in boxs/io.py
def seek(self, offset, whence=io.SEEK_SET):
return self.delegate.seek(offset, whence)
seekable(self)
🔗
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
Source code in boxs/io.py
def seekable(self):
return self.delegate.seekable()
tell(self)
🔗
Return current stream position.
Source code in boxs/io.py
def tell(self):
return self.delegate.tell()
truncate(self, size=None)
🔗
Truncate file to size bytes.
File pointer is left unchanged. Size defaults to the current IO position as reported by tell(). Returns the new size.
Source code in boxs/io.py
def truncate(self, size=None):
return self.delegate.truncate(size)
writable(self)
🔗
Return whether object was opened for writing.
If False, write() will raise OSError.
Source code in boxs/io.py
def writable(self):
return self.delegate.writable()
origin
🔗
Origins of data
ORIGIN_FROM_FUNCTION_NAME
🔗
OriginMappingFunction that uses the function_name as origin.
ORIGIN_FROM_NAME
🔗
OriginMappingFunction that uses the name as origin.
ORIGIN_FROM_TAGS
🔗
OriginMappingFunction that uses the tags in JSON format as origin.
OriginMappingFunction
🔗
A function that takes a OriginContext and returns the origin as string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
boxs.origin.OriginContext |
The context from which to derive the origin. |
required |
Returns:
Type | Description |
---|---|
str |
The origin. |
OriginContext
🔗
Context from which an origin mapping function can derive the origin.
Attributes:
Name | Type | Description |
---|---|---|
function_name |
str |
The name of the function that called. |
arg_info |
inspect.ArgInfo |
A data structure that contains the arguments of the function which called. |
name |
str |
The name that was given to |
tags |
Dict[str,str] |
The tags this item will be assigned to. |
Source code in boxs/origin.py
class OriginContext:
"""
Context from which an origin mapping function can derive the origin.
Attributes:
function_name (str): The name of the function that called.
arg_info (inspect.ArgInfo): A data structure that contains the arguments of
the function which called.
name (str): The name that was given to `store()`.
tags (Dict[str,str]): The tags this item will be assigned to.
"""
def __init__(self, name, tags, level=2):
frame = inspect.currentframe()
for _ in range(level):
frame = frame.f_back
self.function_name = frame.f_code.co_name
self.arg_info = inspect.getargvalues(frame)
self.name = name
self.tags = tags
determine_origin(origin, name=None, tags=None, level=2)
🔗
Determine an origin.
If the given origin is a callable, we run it and take its return value as new origin.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
origin |
Union[str, OriginMappingFunction, Callable[[],str]] |
A string or a
callable that returns a string. The callable can either have no arguments
or a single argument of type |
required |
name |
str |
Name that will be available in the OriginContext if needed. |
None |
tags |
Dict[str,str] |
Tags that will be available in the context if needed. |
None |
level |
int |
The levels on the stack that we should go back. Defaults to 2 which selects the calling frame of determine_origin(). |
2 |
Returns:
Type | Description |
---|---|
str |
The origin as string. |
Source code in boxs/origin.py
def determine_origin(origin, name=None, tags=None, level=2):
"""
Determine an origin.
If the given origin is a callable, we run it and take its return value as new
origin.
Args:
origin (Union[str, OriginMappingFunction, Callable[[],str]]): A string or a
callable that returns a string. The callable can either have no arguments
or a single argument of type `boxs.origin.OriginContext`.
name (str): Name that will be available in the OriginContext if needed.
tags (Dict[str,str]): Tags that will be available in the context if needed.
level (int): The levels on the stack that we should go back. Defaults to 2
which selects the calling frame of determine_origin().
Returns:
str: The origin as string.
"""
if callable(origin):
if inspect.signature(origin).parameters:
context = OriginContext(name, tags, level=level)
origin = origin(context)
else:
origin = origin()
if origin is None:
raise ValueError("No origin given (is 'None').")
return origin
pandas
🔗
Value type definitions for pandas specific classes
PandasDataFrameCsvValueType (StringValueType)
🔗
A value type for storing and loading pandas DataFrame.
Source code in boxs/pandas.py
class PandasDataFrameCsvValueType(StringValueType):
"""
A value type for storing and loading pandas DataFrame.
"""
def supports(self, value):
return isinstance(value, pandas.DataFrame)
def write_value_to_writer(self, value, writer):
with writer.as_stream() as stream, io.TextIOWrapper(
stream, encoding=self._default_encoding
) as text_writer:
value.to_csv(text_writer)
writer.meta['encoding'] = self._default_encoding
def read_value_from_reader(self, reader):
encoding = reader.meta.get('encoding', self._default_encoding)
with reader.as_stream() as stream:
text_stream = codecs.getreader(encoding)(stream)
setattr(text_stream, 'mode', 'r')
result = pandas.read_csv(text_stream, encoding=encoding)
return result
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/pandas.py
def read_value_from_reader(self, reader):
encoding = reader.meta.get('encoding', self._default_encoding)
with reader.as_stream() as stream:
text_stream = codecs.getreader(encoding)(stream)
setattr(text_stream, 'mode', 'r')
result = pandas.read_csv(text_stream, encoding=encoding)
return result
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/pandas.py
def supports(self, value):
return isinstance(value, pandas.DataFrame)
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/pandas.py
def write_value_to_writer(self, value, writer):
with writer.as_stream() as stream, io.TextIOWrapper(
stream, encoding=self._default_encoding
) as text_writer:
value.to_csv(text_writer)
writer.meta['encoding'] = self._default_encoding
run
🔗
Functions for managing the run id.
get_run_id()
🔗
Returns the run id.
The run id is a unique identifier that is specific to an individual run of a workflow. It stays the same across all task executions and can be used for tracking metrics and differentiating between different runs of the same workflow where task_id and run_id stay the same.
Returns:
Type | Description |
---|---|
str |
The unique run id. |
Source code in boxs/run.py
def get_run_id():
"""
Returns the run id.
The run id is a unique identifier that is specific to an individual run of a
workflow. It stays the same across all task executions and can be used for
tracking metrics and differentiating between different runs of the same workflow
where task_id and run_id stay the same.
Returns:
str: The unique run id.
"""
if _RUN_ID is None:
set_run_id(str(uuid.uuid1()))
return _RUN_ID
set_run_id(run_id)
🔗
Sets the run id.
Setting the run id explicitly is usually not necessary. The function is mainly used when task executions are run in a different process to make sure the run id is consistent with the spawning process, but it can be used e.g. if an external system provides a unique identifier for a specific workflow run.
When set_run_id(run_id)
is being used, it must be run before the first tasks
are actually defined.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the run id was already set before. |
Source code in boxs/run.py
def set_run_id(run_id):
"""
Sets the run id.
Setting the run id explicitly is usually not necessary. The function is mainly
used when task executions are run in a different process to make sure the run id
is consistent with the spawning process, but it can be used e.g. if an external
system provides a unique identifier for a specific workflow run.
When `set_run_id(run_id)` is being used, it must be run before the first tasks
are actually defined.
Raises:
RuntimeError: If the run id was already set before.
"""
global _RUN_ID # pylint: disable=global-statement
if _RUN_ID is not None:
logger.error("run_id already set to %s when trying to set again", _RUN_ID)
raise RuntimeError("Run ID was already set")
logger.info("Set run_id to %s", run_id)
_RUN_ID = run_id
statistics
🔗
Collecting statistics about data
StatisticsTransformer (Transformer)
🔗
Transformer that collects statistics about data items.
This transformer gathers statistics like size of the data, number of lines in the data or time when it was stored and adds those as additional values in the data's meta-data. The following meta-data values are set:
- 'size_in_bytes' as int
- 'number_of_lines' as int
- 'store_start' Timestamp in ISO-format when the storing of the data started.
- 'store_end' Timestamp in ISO-format when the storing of the data finished.
Source code in boxs/statistics.py
class StatisticsTransformer(Transformer):
"""
Transformer that collects statistics about data items.
This transformer gathers statistics like size of the data, number of lines in the
data or time when it was stored and adds those as additional values in the data's
meta-data. The following meta-data values are set:
- 'size_in_bytes' as int
- 'number_of_lines' as int
- 'store_start' Timestamp in ISO-format when the storing of the data started.
- 'store_end' Timestamp in ISO-format when the storing of the data finished.
"""
def transform_writer(self, writer):
return _StatisticsWriter(writer)
transform_writer(self, writer)
🔗
Transform a given writer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
writer |
boxs.storage.Writer |
Writer object that is used for writing new data content and meta-data. |
required |
Returns:
Type | Description |
---|---|
boxs.storage.Writer |
A modified writer that will be used instead. |
Source code in boxs/statistics.py
def transform_writer(self, writer):
return _StatisticsWriter(writer)
storage
🔗
Interface to backend storage
Item (Item)
🔗
A class representing a data item.
Source code in boxs/storage.py
class Item(collections.namedtuple('Item', 'box_id data_id run_id name time')):
"""
A class representing a data item.
"""
__slots__ = ()
def __str__(self):
return f"Item(boxs://{self.box_id}/{self.data_id}/{self.run_id})"
ItemQuery
🔗
Query object that allows to query a Storage for items.
The query is build from a string with up to 3 components separated by ':'.
The individual components are the
All components are treated as prefixes, so one doesn't have to write the full ids.
Examples:
Query all items in a specific run🔗
>>> ItemQuery('my-run-id')
# or with written separators
>>> ItemQuery('::my-run-id')
Query all items with the same data-id in all runs🔗
>>> ItemQuery('my-data-id:')
Query all items with the same data-id in specific runs with a shared prefix🔗
>>> ItemQuery('my-data-id:my-run')
# for multiple runs like e.g. my-run-1 and my-run-2
Query everything in a specific box:🔗
>>> ItemQuery('box-id::')
Attributes:
Name | Type | Description |
---|---|---|
box |
Optional[str] |
The optional box id. |
data |
Optional[str] |
The optional prefix for data ids or names. |
run |
Optional[str] |
The optional prefix for run ids or names. |
Source code in boxs/storage.py
class ItemQuery:
"""
Query object that allows to query a Storage for items.
The query is build from a string with up to 3 components separated by ':'.
The individual components are the <box-id>:<data-id>:<run-id>.
A query doesn't have to contain all components, but it needs to contain at least
one with its trailing ':'.
All components are treated as prefixes, so one doesn't have to write the full ids.
Examples:
# Query all items in a specific run
>>> ItemQuery('my-run-id')
# or with written separators
>>> ItemQuery('::my-run-id')
# Query all items with the same data-id in all runs
>>> ItemQuery('my-data-id:')
# Query all items with the same data-id in specific runs with a shared prefix
>>> ItemQuery('my-data-id:my-run')
# for multiple runs like e.g. my-run-1 and my-run-2
# Query everything in a specific box:
>>> ItemQuery('box-id::')
Attributes:
box (Optional[str]): The optional box id.
data (Optional[str]): The optional prefix for data ids or names.
run (Optional[str]): The optional prefix for run ids or names.
"""
def __init__(self, string):
parts = list(reversed(string.strip().rsplit(':')))
self.run = parts[0] or None
if len(parts) > 1:
self.data = parts[1] or None
else:
self.data = None
if len(parts) > 2:
self.box = parts[2] or None
else:
self.box = None
if len(parts) > 3:
raise ValueError("Invalid query, must be in format '<box>:<data>:<run>'.")
if self.run is None and self.data is None and self.box is None:
raise ValueError("Neither, box, data or run is specified.")
@classmethod
def from_fields(cls, box=None, data=None, run=None):
"""
Create an ItemQuery from the individual fields of the query.
Args:
box (Optional[str]): The search string for boxes. Defaults to `None`
matching all boxes.
data (Optional[str]): The search string for data items. Defaults to `None`
matching all data items.
run (Optional[str]): The search string for run. Defaults to `None`
matching all runs.
Returns:
ItemQuery: The new item query with the given search fields.
"""
return ItemQuery(':'.join([box or '', data or '', run or '']))
def __str__(self):
return ':'.join([self.box or '', self.data or '', self.run or ''])
from_fields(box=None, data=None, run=None)
classmethod
🔗
Create an ItemQuery from the individual fields of the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
box |
Optional[str] |
The search string for boxes. Defaults to |
None |
data |
Optional[str] |
The search string for data items. Defaults to |
None |
run |
Optional[str] |
The search string for run. Defaults to |
None |
Returns:
Type | Description |
---|---|
ItemQuery |
The new item query with the given search fields. |
Source code in boxs/storage.py
@classmethod
def from_fields(cls, box=None, data=None, run=None):
"""
Create an ItemQuery from the individual fields of the query.
Args:
box (Optional[str]): The search string for boxes. Defaults to `None`
matching all boxes.
data (Optional[str]): The search string for data items. Defaults to `None`
matching all data items.
run (Optional[str]): The search string for run. Defaults to `None`
matching all runs.
Returns:
ItemQuery: The new item query with the given search fields.
"""
return ItemQuery(':'.join([box or '', data or '', run or '']))
Reader (ABC)
🔗
Base class for the storage specific reader implementations.
Source code in boxs/storage.py
class Reader(abc.ABC):
"""
Base class for the storage specific reader implementations.
"""
def __init__(self, item):
"""
Creates a `Reader` instance, that allows to load existing data.
Args:
item (boxs.storage.Item): The `item` with the data that should be
loaded.
"""
self._item = item
@property
def item(self):
"""The item of the data that this reader can read."""
return self._item
def read_value(self, value_type):
"""
Read the value and return it.
Args:
value_type (boxs.value_types.ValueType): The value type that reads the
value from the reader and converts it to the correct type.
Returns:
Any: The returned value from the `value_type`.
"""
return value_type.read_value_from_reader(self)
@property
@abc.abstractmethod
def info(self):
"""Dictionary containing information about the data."""
@property
def meta(self):
"""Dictionary containing the meta-data about the data."""
return self.info['meta']
@abc.abstractmethod
def as_stream(self):
"""
Return a stream from which the data content can be read.
Returns:
io.RawIOBase: A stream instance from which the data can be read.
"""
info
property
readonly
🔗
Dictionary containing information about the data.
item
property
readonly
🔗
The item of the data that this reader can read.
meta
property
readonly
🔗
Dictionary containing the meta-data about the data.
__init__(self, item)
special
🔗
Creates a Reader
instance, that allows to load existing data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
boxs.storage.Item |
The |
required |
Source code in boxs/storage.py
def __init__(self, item):
"""
Creates a `Reader` instance, that allows to load existing data.
Args:
item (boxs.storage.Item): The `item` with the data that should be
loaded.
"""
self._item = item
as_stream(self)
🔗
Return a stream from which the data content can be read.
Returns:
Type | Description |
---|---|
io.RawIOBase |
A stream instance from which the data can be read. |
Source code in boxs/storage.py
@abc.abstractmethod
def as_stream(self):
"""
Return a stream from which the data content can be read.
Returns:
io.RawIOBase: A stream instance from which the data can be read.
"""
read_value(self, value_type)
🔗
Read the value and return it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value_type |
boxs.value_types.ValueType |
The value type that reads the value from the reader and converts it to the correct type. |
required |
Returns:
Type | Description |
---|---|
Any |
The returned value from the |
Source code in boxs/storage.py
def read_value(self, value_type):
"""
Read the value and return it.
Args:
value_type (boxs.value_types.ValueType): The value type that reads the
value from the reader and converts it to the correct type.
Returns:
Any: The returned value from the `value_type`.
"""
return value_type.read_value_from_reader(self)
Run (Run)
🔗
A class representing a run.
Source code in boxs/storage.py
class Run(collections.namedtuple('Run', 'box_id run_id name time')):
"""
A class representing a run.
"""
__slots__ = ()
def __str__(self):
return f"Run({self.box_id}/{self.run_id})"
def __eq__(self, o):
return (self.box_id, self.run_id) == (o.box_id, o.run_id)
def __hash__(self):
return hash((self.box_id, self.run_id))
Storage (ABC)
🔗
Backend that allows a box to store and load data in arbitrary storage locations.
This abstract base class defines the interface, that is used by Box
to store
and load data. The data items between Box
and Storage
are always identified
by their box_id
, data_id
and run_id
. The functionality to store data is
provided by the Writer
object, that is created by the create_writer()
method.
Similarly, loading data is implemented in a separate Reader
object that is
created by create_reader()
.
Source code in boxs/storage.py
class Storage(abc.ABC):
"""
Backend that allows a box to store and load data in arbitrary storage locations.
This abstract base class defines the interface, that is used by `Box` to store
and load data. The data items between `Box` and `Storage` are always identified
by their `box_id`, `data_id` and `run_id`. The functionality to store data is
provided by the `Writer` object, that is created by the `create_writer()` method.
Similarly, loading data is implemented in a separate `Reader` object that is
created by `create_reader()`.
"""
@abc.abstractmethod
def create_reader(self, item):
"""
Creates a `Reader` instance, that allows to load existing data.
Args:
item (boxs.storage.Item): The item that should be read.
Returns:
boxs.storage.Reader: The reader that will load the data from the
storage.
"""
@abc.abstractmethod
def create_writer(self, item, name=None, tags=None):
"""
Creates a `Writer` instance, that allows to store new data.
Args:
item (boxs.storage.Item): The new data item.
name (str): An optional name, that can be used for referring to this item
within the run. Defaults to `None`.
tags (Dict[str,str]): A dictionary containing tags that can be used for
grouping multiple items together. Defaults to an empty dictionary.
Returns:
boxs.storage.Writer: The writer that will write the data into the
storage.
"""
@abc.abstractmethod
def list_runs(self, box_id, limit=None, name_filter=None):
"""
List the runs within a box stored in this storage.
The runs should be returned in descending order of their start time.
Args:
box_id (str): `box_id` of the box in which to look for runs.
limit (Optional[int]): Limits the returned runs to maximum `limit` number.
Defaults to `None` in which case all runs are returned.
name_filter (Optional[str]): If set, only include runs which have names
that have the filter as prefix. Defaults to `None` in which case all
runs are returned.
Returns:
List[box.storage.Run]: The runs.
"""
@abc.abstractmethod
def list_items(self, item_query):
"""
List all items that match a given query.
The item query can contain parts of box id, run id or run name and data id or
data name. If a query value is not set (`== None`) it is not used as a filter
criteria.
Args:
item_query (boxs.storage.ItemQuery): The query which defines which items
should be listed.
Returns:
List[box.storage.Item]: The runs.
"""
@abc.abstractmethod
def set_run_name(self, box_id, run_id, name):
"""
Set the name of a run.
The name can be updated and removed by providing `None`.
Args;
box_id (str): `box_id` of the box in which the run is stored.
run_id (str): Run id of the run which should be named.
name (Optional[str]): New name of the run. If `None`, an existing name
will be removed.
Returns:
box.storage.Run: The run with its new name.
"""
@abc.abstractmethod
def delete_run(self, box_id, run_id):
"""
Delete all the data of the specified run.
Args;
box_id (str): `box_id` of the box in which the run is stored.
run_id (str): Run id of the run which should be deleted.
"""
create_reader(self, item)
🔗
Creates a Reader
instance, that allows to load existing data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
boxs.storage.Item |
The item that should be read. |
required |
Returns:
Type | Description |
---|---|
boxs.storage.Reader |
The reader that will load the data from the storage. |
Source code in boxs/storage.py
@abc.abstractmethod
def create_reader(self, item):
"""
Creates a `Reader` instance, that allows to load existing data.
Args:
item (boxs.storage.Item): The item that should be read.
Returns:
boxs.storage.Reader: The reader that will load the data from the
storage.
"""
create_writer(self, item, name=None, tags=None)
🔗
Creates a Writer
instance, that allows to store new data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
boxs.storage.Item |
The new data item. |
required |
name |
str |
An optional name, that can be used for referring to this item
within the run. Defaults to |
None |
tags |
Dict[str,str] |
A dictionary containing tags that can be used for grouping multiple items together. Defaults to an empty dictionary. |
None |
Returns:
Type | Description |
---|---|
boxs.storage.Writer |
The writer that will write the data into the storage. |
Source code in boxs/storage.py
@abc.abstractmethod
def create_writer(self, item, name=None, tags=None):
"""
Creates a `Writer` instance, that allows to store new data.
Args:
item (boxs.storage.Item): The new data item.
name (str): An optional name, that can be used for referring to this item
within the run. Defaults to `None`.
tags (Dict[str,str]): A dictionary containing tags that can be used for
grouping multiple items together. Defaults to an empty dictionary.
Returns:
boxs.storage.Writer: The writer that will write the data into the
storage.
"""
delete_run(self, box_id, run_id)
🔗
Delete all the data of the specified run.
Args;
box_id (str): box_id
of the box in which the run is stored.
run_id (str): Run id of the run which should be deleted.
Source code in boxs/storage.py
@abc.abstractmethod
def delete_run(self, box_id, run_id):
"""
Delete all the data of the specified run.
Args;
box_id (str): `box_id` of the box in which the run is stored.
run_id (str): Run id of the run which should be deleted.
"""
list_items(self, item_query)
🔗
List all items that match a given query.
The item query can contain parts of box id, run id or run name and data id or
data name. If a query value is not set (== None
) it is not used as a filter
criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item_query |
boxs.storage.ItemQuery |
The query which defines which items should be listed. |
required |
Returns:
Type | Description |
---|---|
List[box.storage.Item] |
The runs. |
Source code in boxs/storage.py
@abc.abstractmethod
def list_items(self, item_query):
"""
List all items that match a given query.
The item query can contain parts of box id, run id or run name and data id or
data name. If a query value is not set (`== None`) it is not used as a filter
criteria.
Args:
item_query (boxs.storage.ItemQuery): The query which defines which items
should be listed.
Returns:
List[box.storage.Item]: The runs.
"""
list_runs(self, box_id, limit=None, name_filter=None)
🔗
List the runs within a box stored in this storage.
The runs should be returned in descending order of their start time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
box_id |
str |
|
required |
limit |
Optional[int] |
Limits the returned runs to maximum |
None |
name_filter |
Optional[str] |
If set, only include runs which have names
that have the filter as prefix. Defaults to |
None |
Returns:
Type | Description |
---|---|
List[box.storage.Run] |
The runs. |
Source code in boxs/storage.py
@abc.abstractmethod
def list_runs(self, box_id, limit=None, name_filter=None):
"""
List the runs within a box stored in this storage.
The runs should be returned in descending order of their start time.
Args:
box_id (str): `box_id` of the box in which to look for runs.
limit (Optional[int]): Limits the returned runs to maximum `limit` number.
Defaults to `None` in which case all runs are returned.
name_filter (Optional[str]): If set, only include runs which have names
that have the filter as prefix. Defaults to `None` in which case all
runs are returned.
Returns:
List[box.storage.Run]: The runs.
"""
set_run_name(self, box_id, run_id, name)
🔗
Set the name of a run.
The name can be updated and removed by providing None
.
Args;
box_id (str): box_id
of the box in which the run is stored.
run_id (str): Run id of the run which should be named.
name (Optional[str]): New name of the run. If None
, an existing name
will be removed.
Returns:
Type | Description |
---|---|
box.storage.Run |
The run with its new name. |
Source code in boxs/storage.py
@abc.abstractmethod
def set_run_name(self, box_id, run_id, name):
"""
Set the name of a run.
The name can be updated and removed by providing `None`.
Args;
box_id (str): `box_id` of the box in which the run is stored.
run_id (str): Run id of the run which should be named.
name (Optional[str]): New name of the run. If `None`, an existing name
will be removed.
Returns:
box.storage.Run: The run with its new name.
"""
Writer (ABC)
🔗
Base class for the storage specific writer implementations.
Source code in boxs/storage.py
class Writer(abc.ABC):
"""
Base class for the storage specific writer implementations.
"""
def __init__(self, item, name, tags):
"""
Creates a `Writer` instance, that allows to store new data.
Args:
item (boxs.storage.Item): The new item.
"""
self._item = item
self._name = name
self._tags = tags
self._meta = {}
@property
def item(self):
"""Returns the item which this writer writes to."""
return self._item
@property
def name(self):
"""Returns the name of the new data item."""
return self._name
@property
def tags(self):
"""Returns the tags of the new data item."""
return self._tags
@property
def meta(self):
"""
Returns a dictionary which contains meta-data of the item.
This allows either ValueTypes or Transformers to add additional
meta-data for the data item.
"""
return self._meta
def write_value(self, value, value_type):
"""
Write the data content to the storage.
Args:
value (Any): The value that should be written to the writer.
value_type (boxs.value_types.ValueType): The value type that takes care
of actually writing the value and converting it to the correct type.
"""
value_type.write_value_to_writer(value, self)
@abc.abstractmethod
def write_info(self, info):
"""
Write the info for the data item to the storage.
Args:
info (Dict[str,Any]): The information about the new data item.
Raises:
boxs.errors.DataCollision: If a data item with the same ids already
exists.
"""
@abc.abstractmethod
def as_stream(self):
"""
Return a stream to which the data content should be written.
This method can be used by the ValueType to actually transfer the data.
Returns:
io.RawIOBase: The binary io-stream.
Raises:
boxs.errors.DataCollision: If a data item with the same ids already
exists.
"""
item
property
readonly
🔗
Returns the item which this writer writes to.
meta
property
readonly
🔗
Returns a dictionary which contains meta-data of the item.
This allows either ValueTypes or Transformers to add additional meta-data for the data item.
name
property
readonly
🔗
Returns the name of the new data item.
tags
property
readonly
🔗
Returns the tags of the new data item.
__init__(self, item, name, tags)
special
🔗
Creates a Writer
instance, that allows to store new data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
boxs.storage.Item |
The new item. |
required |
Source code in boxs/storage.py
def __init__(self, item, name, tags):
"""
Creates a `Writer` instance, that allows to store new data.
Args:
item (boxs.storage.Item): The new item.
"""
self._item = item
self._name = name
self._tags = tags
self._meta = {}
as_stream(self)
🔗
Return a stream to which the data content should be written.
This method can be used by the ValueType to actually transfer the data.
Returns:
Type | Description |
---|---|
io.RawIOBase |
The binary io-stream. |
Exceptions:
Type | Description |
---|---|
boxs.errors.DataCollision |
If a data item with the same ids already exists. |
Source code in boxs/storage.py
@abc.abstractmethod
def as_stream(self):
"""
Return a stream to which the data content should be written.
This method can be used by the ValueType to actually transfer the data.
Returns:
io.RawIOBase: The binary io-stream.
Raises:
boxs.errors.DataCollision: If a data item with the same ids already
exists.
"""
write_info(self, info)
🔗
Write the info for the data item to the storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
Dict[str,Any] |
The information about the new data item. |
required |
Exceptions:
Type | Description |
---|---|
boxs.errors.DataCollision |
If a data item with the same ids already exists. |
Source code in boxs/storage.py
@abc.abstractmethod
def write_info(self, info):
"""
Write the info for the data item to the storage.
Args:
info (Dict[str,Any]): The information about the new data item.
Raises:
boxs.errors.DataCollision: If a data item with the same ids already
exists.
"""
write_value(self, value, value_type)
🔗
Write the data content to the storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written to the writer. |
required |
value_type |
boxs.value_types.ValueType |
The value type that takes care of actually writing the value and converting it to the correct type. |
required |
Source code in boxs/storage.py
def write_value(self, value, value_type):
"""
Write the data content to the storage.
Args:
value (Any): The value that should be written to the writer.
value_type (boxs.value_types.ValueType): The value type that takes care
of actually writing the value and converting it to the correct type.
"""
value_type.write_value_to_writer(value, self)
tensorflow
🔗
Value type definitions for storing tensorflow specific classes
TensorBoardLogDirValueType (DirectoryValueType)
🔗
Value type for storing tensorbord logs.
The necessary tensorflow functions for saving and loading the model to a directory are dynamically loaded, so that the module can be imported WITHOUT tensorflow. Only if one instantiates an instance of the class, the tensorflow package must be available.
Source code in boxs/tensorflow.py
class TensorBoardLogDirValueType(DirectoryValueType):
"""
Value type for storing tensorbord logs.
The necessary tensorflow functions for saving and loading the model to a directory
are dynamically loaded, so that the module can be imported WITHOUT tensorflow.
Only if one instantiates an instance of the class, the tensorflow package must be
available.
"""
def write_value_to_writer(self, value, writer):
super().write_value_to_writer(pathlib.Path(value), writer)
writer.meta['dir_content'] = 'tensorboard-logs'
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/tensorflow.py
def write_value_to_writer(self, value, writer):
super().write_value_to_writer(pathlib.Path(value), writer)
writer.meta['dir_content'] = 'tensorboard-logs'
TensorflowKerasModelValueType (DirectoryValueType)
🔗
Value type for storing tensorflow keras models.
The necessary tensorflow functions for saving and loading the model to a directory are dynamically loaded, so that the module can be imported WITHOUT tensorflow. Only if one instantiates an instance of the class, the tensorflow package must be available.
Source code in boxs/tensorflow.py
class TensorflowKerasModelValueType(DirectoryValueType):
"""
Value type for storing tensorflow keras models.
The necessary tensorflow functions for saving and loading the model to a directory
are dynamically loaded, so that the module can be imported WITHOUT tensorflow.
Only if one instantiates an instance of the class, the tensorflow package must be
available.
"""
def __init__(self, dir_path=None, default_format='tf'):
self._tf_models_module = importlib.import_module('tensorflow.keras.models')
self._default_format = default_format
super().__init__(dir_path)
def supports(self, value):
return False
def write_value_to_writer(self, value, writer):
model_dir_path = pathlib.Path(tempfile.mkdtemp())
try:
self._tf_models_module.save_model(
value, filepath=model_dir_path, save_format=self._default_format
)
super().write_value_to_writer(model_dir_path, writer)
writer.meta['model_format'] = self._default_format
finally:
shutil.rmtree(model_dir_path)
def read_value_from_reader(self, reader):
model_dir_path = super().read_value_from_reader(reader)
try:
result = self._tf_models_module.load_model(filepath=model_dir_path)
finally:
if self._dir_path is None:
shutil.rmtree(model_dir_path)
return result
def _get_parameter_string(self):
return self._default_format
@classmethod
def _from_parameter_string(cls, parameters):
return cls(default_format=parameters)
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/tensorflow.py
def read_value_from_reader(self, reader):
model_dir_path = super().read_value_from_reader(reader)
try:
result = self._tf_models_module.load_model(filepath=model_dir_path)
finally:
if self._dir_path is None:
shutil.rmtree(model_dir_path)
return result
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/tensorflow.py
def supports(self, value):
return False
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/tensorflow.py
def write_value_to_writer(self, value, writer):
model_dir_path = pathlib.Path(tempfile.mkdtemp())
try:
self._tf_models_module.save_model(
value, filepath=model_dir_path, save_format=self._default_format
)
super().write_value_to_writer(model_dir_path, writer)
writer.meta['model_format'] = self._default_format
finally:
shutil.rmtree(model_dir_path)
transform
🔗
Transforming data items
DelegatingReader (Reader)
🔗
Reader class that delegates all calls to a wrapped reader.
Source code in boxs/transform.py
class DelegatingReader(Reader):
"""
Reader class that delegates all calls to a wrapped reader.
"""
def __init__(self, delegate):
"""
Create a new DelegatingReader.
Args:
delegate (boxs.storage.Reader): The reader to which all calls are
delegated.
"""
super().__init__(delegate.item)
self.delegate = delegate
@property
def info(self):
return self.delegate.info
@property
def meta(self):
return self.delegate.meta
def read_value(self, value_type):
return self.delegate.read_value(value_type)
def as_stream(self):
return self.delegate.as_stream()
info
property
readonly
🔗
Dictionary containing information about the data.
meta
property
readonly
🔗
Dictionary containing the meta-data about the data.
__init__(self, delegate)
special
🔗
Create a new DelegatingReader.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
delegate |
boxs.storage.Reader |
The reader to which all calls are delegated. |
required |
Source code in boxs/transform.py
def __init__(self, delegate):
"""
Create a new DelegatingReader.
Args:
delegate (boxs.storage.Reader): The reader to which all calls are
delegated.
"""
super().__init__(delegate.item)
self.delegate = delegate
as_stream(self)
🔗
Return a stream from which the data content can be read.
Returns:
Type | Description |
---|---|
io.RawIOBase |
A stream instance from which the data can be read. |
Source code in boxs/transform.py
def as_stream(self):
return self.delegate.as_stream()
read_value(self, value_type)
🔗
Read the value and return it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value_type |
boxs.value_types.ValueType |
The value type that reads the value from the reader and converts it to the correct type. |
required |
Returns:
Type | Description |
---|---|
Any |
The returned value from the |
Source code in boxs/transform.py
def read_value(self, value_type):
return self.delegate.read_value(value_type)
DelegatingWriter (Writer)
🔗
Writer that delegates all call to a wrapped writer.
Source code in boxs/transform.py
class DelegatingWriter(Writer):
"""
Writer that delegates all call to a wrapped writer.
"""
def __init__(self, delegate):
self.delegate = delegate
super().__init__(delegate.item, delegate.name, delegate.tags)
@property
def meta(self):
return self.delegate.meta
def write_value(self, value, value_type):
self.delegate.write_value(value, value_type)
def write_info(self, info):
return self.delegate.write_info(info)
def as_stream(self):
return self.delegate.as_stream()
meta
property
readonly
🔗
Returns a dictionary which contains meta-data of the item.
This allows either ValueTypes or Transformers to add additional meta-data for the data item.
as_stream(self)
🔗
Return a stream to which the data content should be written.
This method can be used by the ValueType to actually transfer the data.
Returns:
Type | Description |
---|---|
io.RawIOBase |
The binary io-stream. |
Exceptions:
Type | Description |
---|---|
boxs.errors.DataCollision |
If a data item with the same ids already exists. |
Source code in boxs/transform.py
def as_stream(self):
return self.delegate.as_stream()
write_info(self, info)
🔗
Write the info for the data item to the storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
Dict[str,Any] |
The information about the new data item. |
required |
Exceptions:
Type | Description |
---|---|
boxs.errors.DataCollision |
If a data item with the same ids already exists. |
Source code in boxs/transform.py
def write_info(self, info):
return self.delegate.write_info(info)
write_value(self, value, value_type)
🔗
Write the data content to the storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written to the writer. |
required |
value_type |
boxs.value_types.ValueType |
The value type that takes care of actually writing the value and converting it to the correct type. |
required |
Source code in boxs/transform.py
def write_value(self, value, value_type):
self.delegate.write_value(value, value_type)
Transformer
🔗
Base class for transformers
Transformers allow modifying content and meta-data of a DataItem during store and load by wrapping the writer and reader that are used for accessing them from the storage. This can be useful for e.g. adding new meta-data, filtering content or implementing encryption.
Source code in boxs/transform.py
class Transformer:
# pylint: disable=no-self-use
"""
Base class for transformers
Transformers allow modifying content and meta-data of a DataItem during store and
load by wrapping the writer and reader that are used for accessing them from the
storage. This can be useful for e.g. adding new meta-data, filtering content or
implementing encryption.
"""
def transform_writer(self, writer):
"""
Transform a given writer.
Args:
writer (boxs.storage.Writer): Writer object that is used for writing
new data content and meta-data.
Returns:
boxs.storage.Writer: A modified writer that will be used instead.
"""
return writer
def transform_reader(self, reader):
"""
Transform a given reader.
Args:
reader (boxs.storage.Reader): Reader object that is used for reading
data content and meta-data.
Returns:
boxs.storage.Reader: A modified reader that will be used instead.
"""
return reader
transform_reader(self, reader)
🔗
Transform a given reader.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
Reader object that is used for reading data content and meta-data. |
required |
Returns:
Type | Description |
---|---|
boxs.storage.Reader |
A modified reader that will be used instead. |
Source code in boxs/transform.py
def transform_reader(self, reader):
"""
Transform a given reader.
Args:
reader (boxs.storage.Reader): Reader object that is used for reading
data content and meta-data.
Returns:
boxs.storage.Reader: A modified reader that will be used instead.
"""
return reader
transform_writer(self, writer)
🔗
Transform a given writer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
writer |
boxs.storage.Writer |
Writer object that is used for writing new data content and meta-data. |
required |
Returns:
Type | Description |
---|---|
boxs.storage.Writer |
A modified writer that will be used instead. |
Source code in boxs/transform.py
def transform_writer(self, writer):
"""
Transform a given writer.
Args:
writer (boxs.storage.Writer): Writer object that is used for writing
new data content and meta-data.
Returns:
boxs.storage.Writer: A modified writer that will be used instead.
"""
return writer
value_types
🔗
Types for reading and writing of different value types
BytesValueType (ValueType)
🔗
A ValueType for reading and writing bytes/bytearray values.
Source code in boxs/value_types.py
class BytesValueType(ValueType):
"""
A ValueType for reading and writing bytes/bytearray values.
"""
def supports(self, value):
return isinstance(value, (bytes, bytearray))
def write_value_to_writer(self, value, writer):
source_stream = io.BytesIO(value)
with writer.as_stream() as destination_stream:
shutil.copyfileobj(source_stream, destination_stream)
def read_value_from_reader(self, reader):
with reader.as_stream() as stream:
return stream.read()
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/value_types.py
def read_value_from_reader(self, reader):
with reader.as_stream() as stream:
return stream.read()
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/value_types.py
def supports(self, value):
return isinstance(value, (bytes, bytearray))
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/value_types.py
def write_value_to_writer(self, value, writer):
source_stream = io.BytesIO(value)
with writer.as_stream() as destination_stream:
shutil.copyfileobj(source_stream, destination_stream)
DirectoryValueType (ValueType)
🔗
A ValueType for reading and writing directories.
The values have to be instances of pathlib.Path
and must point to an existing
directory. Everything within this directory is then added to a new zip archive,
that is written to the storage.
Source code in boxs/value_types.py
class DirectoryValueType(ValueType):
"""
A ValueType for reading and writing directories.
The values have to be instances of `pathlib.Path` and must point to an existing
directory. Everything within this directory is then added to a new zip archive,
that is written to the storage.
"""
def __init__(self, dir_path=None):
self._dir_path = dir_path
super().__init__()
def supports(self, value):
return isinstance(value, pathlib.Path) and value.exists() and value.is_dir()
def write_value_to_writer(self, value, writer):
def _add_directory(root, directory, _zip_file):
for path in directory.iterdir():
if path.is_file():
_zip_file.write(path, arcname=path.relative_to(root))
if path.is_dir():
_add_directory(root, path, _zip_file)
with writer.as_stream() as destination_stream, zipfile.ZipFile(
destination_stream, mode='w'
) as zip_file:
_add_directory(value, value, zip_file)
def read_value_from_reader(self, reader):
dir_path = self._dir_path
if self._dir_path is None:
dir_path = tempfile.mkdtemp()
dir_path = pathlib.Path(dir_path)
self._logger.debug("Directory will be stored in %s", dir_path)
with reader.as_stream() as read_stream, zipfile.ZipFile(
read_stream, 'r'
) as zip_file:
for zip_info in zip_file.infolist():
target_path = dir_path / zip_info.filename
self._logger.debug(
"Extracting %s to %s", zip_info.filename, target_path
)
zip_file.extract(zip_info, target_path)
return dir_path
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/value_types.py
def read_value_from_reader(self, reader):
dir_path = self._dir_path
if self._dir_path is None:
dir_path = tempfile.mkdtemp()
dir_path = pathlib.Path(dir_path)
self._logger.debug("Directory will be stored in %s", dir_path)
with reader.as_stream() as read_stream, zipfile.ZipFile(
read_stream, 'r'
) as zip_file:
for zip_info in zip_file.infolist():
target_path = dir_path / zip_info.filename
self._logger.debug(
"Extracting %s to %s", zip_info.filename, target_path
)
zip_file.extract(zip_info, target_path)
return dir_path
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/value_types.py
def supports(self, value):
return isinstance(value, pathlib.Path) and value.exists() and value.is_dir()
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/value_types.py
def write_value_to_writer(self, value, writer):
def _add_directory(root, directory, _zip_file):
for path in directory.iterdir():
if path.is_file():
_zip_file.write(path, arcname=path.relative_to(root))
if path.is_dir():
_add_directory(root, path, _zip_file)
with writer.as_stream() as destination_stream, zipfile.ZipFile(
destination_stream, mode='w'
) as zip_file:
_add_directory(value, value, zip_file)
FileValueType (ValueType)
🔗
A ValueType for reading and writing files.
The values have to be instances of pathlib.Path
.
Source code in boxs/value_types.py
class FileValueType(ValueType):
"""
A ValueType for reading and writing files.
The values have to be instances of `pathlib.Path`.
"""
def __init__(self, file_path=None):
self._file_path = file_path
super().__init__()
def supports(self, value):
return isinstance(value, pathlib.Path) and value.exists() and value.is_file()
def write_value_to_writer(self, value, writer):
with value.open('rb') as file_reader, writer.as_stream() as destination_stream:
shutil.copyfileobj(file_reader, destination_stream)
def read_value_from_reader(self, reader):
if hasattr(reader, 'as_file'):
self._logger.debug("Reader has as_file()")
if self._file_path:
self._logger.debug("Copying file directly")
shutil.copyfile(str(reader.as_file()), str(self._file_path))
return self._file_path
return reader.as_file()
file_path = self._file_path
if self._file_path is None:
file_path = tempfile.mktemp()
file_path = pathlib.Path(file_path)
with reader.as_stream() as read_stream, io.FileIO(
file_path, 'w'
) as file_stream:
self._logger.debug("Writing file from stream")
shutil.copyfileobj(read_stream, file_stream)
return file_path
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/value_types.py
def read_value_from_reader(self, reader):
if hasattr(reader, 'as_file'):
self._logger.debug("Reader has as_file()")
if self._file_path:
self._logger.debug("Copying file directly")
shutil.copyfile(str(reader.as_file()), str(self._file_path))
return self._file_path
return reader.as_file()
file_path = self._file_path
if self._file_path is None:
file_path = tempfile.mktemp()
file_path = pathlib.Path(file_path)
with reader.as_stream() as read_stream, io.FileIO(
file_path, 'w'
) as file_stream:
self._logger.debug("Writing file from stream")
shutil.copyfileobj(read_stream, file_stream)
return file_path
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/value_types.py
def supports(self, value):
return isinstance(value, pathlib.Path) and value.exists() and value.is_file()
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/value_types.py
def write_value_to_writer(self, value, writer):
with value.open('rb') as file_reader, writer.as_stream() as destination_stream:
shutil.copyfileobj(file_reader, destination_stream)
JsonValueType (ValueType)
🔗
ValueType for storing values as JSON.
Source code in boxs/value_types.py
class JsonValueType(ValueType):
"""
ValueType for storing values as JSON.
"""
def supports(self, value):
return isinstance(value, (dict, list))
def write_value_to_writer(self, value, writer):
writer.meta['media_type'] = 'application/json'
with writer.as_stream() as destination_stream, io.TextIOWrapper(
destination_stream
) as text_writer:
json.dump(value, text_writer, sort_keys=True, separators=(',', ':'))
def read_value_from_reader(self, reader):
with reader.as_stream() as stream:
return json.load(stream)
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/value_types.py
def read_value_from_reader(self, reader):
with reader.as_stream() as stream:
return json.load(stream)
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/value_types.py
def supports(self, value):
return isinstance(value, (dict, list))
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/value_types.py
def write_value_to_writer(self, value, writer):
writer.meta['media_type'] = 'application/json'
with writer.as_stream() as destination_stream, io.TextIOWrapper(
destination_stream
) as text_writer:
json.dump(value, text_writer, sort_keys=True, separators=(',', ':'))
StreamValueType (ValueType)
🔗
A ValueType for reading and writing from and to a stream.
Source code in boxs/value_types.py
class StreamValueType(ValueType):
"""
A ValueType for reading and writing from and to a stream.
"""
def supports(self, value):
return isinstance(value, io.IOBase)
def write_value_to_writer(self, value, writer):
with writer.as_stream() as destination_stream:
shutil.copyfileobj(value, destination_stream)
def read_value_from_reader(self, reader):
return reader.as_stream()
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/value_types.py
def read_value_from_reader(self, reader):
return reader.as_stream()
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/value_types.py
def supports(self, value):
return isinstance(value, io.IOBase)
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/value_types.py
def write_value_to_writer(self, value, writer):
with writer.as_stream() as destination_stream:
shutil.copyfileobj(value, destination_stream)
StringValueType (ValueType)
🔗
A ValueType for reading and writing string values.
The ValueType can use different encodings via its constructor argument, but defaults to 'utf-8'.
Source code in boxs/value_types.py
class StringValueType(ValueType):
"""
A ValueType for reading and writing string values.
The ValueType can use different encodings via its constructor argument, but
defaults to 'utf-8'.
"""
def __init__(self, default_encoding='utf-8'):
self._default_encoding = default_encoding
super().__init__()
def supports(self, value):
return isinstance(value, str)
def write_value_to_writer(self, value, writer):
source_stream = io.BytesIO(value.encode(self._default_encoding))
writer.meta['encoding'] = self._default_encoding
with writer.as_stream() as destination_stream:
shutil.copyfileobj(source_stream, destination_stream)
def read_value_from_reader(self, reader):
encoding = reader.meta.get('encoding', self._default_encoding)
self._logger.debug("Reading string with encoding %s", encoding)
with reader.as_stream() as stream, io.TextIOWrapper(
stream, encoding=encoding
) as text_reader:
return text_reader.read()
def _get_parameter_string(self):
return self._default_encoding
@classmethod
def _from_parameter_string(cls, parameters):
return cls(default_encoding=parameters)
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/value_types.py
def read_value_from_reader(self, reader):
encoding = reader.meta.get('encoding', self._default_encoding)
self._logger.debug("Reading string with encoding %s", encoding)
with reader.as_stream() as stream, io.TextIOWrapper(
stream, encoding=encoding
) as text_reader:
return text_reader.read()
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/value_types.py
def supports(self, value):
return isinstance(value, str)
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/value_types.py
def write_value_to_writer(self, value, writer):
source_stream = io.BytesIO(value.encode(self._default_encoding))
writer.meta['encoding'] = self._default_encoding
with writer.as_stream() as destination_stream:
shutil.copyfileobj(source_stream, destination_stream)
ValueType (ABC)
🔗
Base class for implementing the type depending reading and writing of values to and from Readers and Writers.
Source code in boxs/value_types.py
class ValueType(abc.ABC):
"""
Base class for implementing the type depending reading and writing of values to
and from Readers and Writers.
"""
def __init__(self):
self._logger = logging.getLogger(str(self.__class__))
def supports(self, value): # pylint: disable=unused-argument,no-self-use
"""
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value
type. It is only necessary, if the value type should be picked up
automatically. If it is only used explicitly, no check is performed.
Args:
value (Any): The value for which the value type should be checked.
Returns:
bool: `True` if the value type supports this value, otherwise `False`.
The default implementation just returns `False`.
"""
return False
@abc.abstractmethod
def write_value_to_writer(self, value, writer):
"""
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations
that take care of the necessary type conversions.
Args:
value (Any): The value that should be written.
writer (boxs.storage.Writer): The writer into which the value should be
written.
"""
@abc.abstractmethod
def read_value_from_reader(self, reader):
"""
Read a value from the reader.
This method needs to be implemented by the specific value type implementations
that take care of the necessary type conversions.
Args:
reader (boxs.storage.Reader): The reader from which the value should be
read.
Returns:
Any: The value that was read from the reader.
"""
def get_specification(self):
"""
Returns a string that specifies this ValueType.
Returns:
str: The specification that can be used for recreating this specific
ValueType.
"""
module_name = self.__class__.__module__
class_name = self.__class__.__qualname__
parameter_string = self._get_parameter_string()
return ':'.join([module_name, class_name, parameter_string])
@classmethod
def from_specification(cls, specification):
"""
Create a new ValueType instance from its specification string.
Args:
specification (str): The specification string that specifies the ValueType
thate should be instantiated.
Returns:
ValueType: The specified ValueType instance.
"""
logger.debug("Recreating value type from specification %s", specification)
module_name, class_name, parameter_string = specification.split(':', maxsplit=2)
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
value_type = class_._from_parameter_string( # pylint: disable=protected-access
parameter_string,
)
return value_type
def _get_parameter_string(self): # pylint: disable=no-self-use
"""
Return a string encoding the ValueType specific parameters.
This method needs to be overridden by subclasses, that use parameters.
Returns:
str: The string containing the parameters.
"""
return ''
@classmethod
def _from_parameter_string(cls, parameters): # pylint: disable=unused-argument
"""
Return a new instance of a specific ValueType from its parameter string.
This method needs to be overridden by subclasses, that use parameters.
Returns:
ValueType: The specified ValueType instance.
"""
return cls()
def __repr__(self):
return self.get_specification()
def __str__(self):
return self.get_specification()
from_specification(specification)
classmethod
🔗
Create a new ValueType instance from its specification string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
specification |
str |
The specification string that specifies the ValueType thate should be instantiated. |
required |
Returns:
Type | Description |
---|---|
ValueType |
The specified ValueType instance. |
Source code in boxs/value_types.py
@classmethod
def from_specification(cls, specification):
"""
Create a new ValueType instance from its specification string.
Args:
specification (str): The specification string that specifies the ValueType
thate should be instantiated.
Returns:
ValueType: The specified ValueType instance.
"""
logger.debug("Recreating value type from specification %s", specification)
module_name, class_name, parameter_string = specification.split(':', maxsplit=2)
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
value_type = class_._from_parameter_string( # pylint: disable=protected-access
parameter_string,
)
return value_type
get_specification(self)
🔗
Returns a string that specifies this ValueType.
Returns:
Type | Description |
---|---|
str |
The specification that can be used for recreating this specific ValueType. |
Source code in boxs/value_types.py
def get_specification(self):
"""
Returns a string that specifies this ValueType.
Returns:
str: The specification that can be used for recreating this specific
ValueType.
"""
module_name = self.__class__.__module__
class_name = self.__class__.__qualname__
parameter_string = self._get_parameter_string()
return ':'.join([module_name, class_name, parameter_string])
read_value_from_reader(self, reader)
🔗
Read a value from the reader.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reader |
boxs.storage.Reader |
The reader from which the value should be read. |
required |
Returns:
Type | Description |
---|---|
Any |
The value that was read from the reader. |
Source code in boxs/value_types.py
@abc.abstractmethod
def read_value_from_reader(self, reader):
"""
Read a value from the reader.
This method needs to be implemented by the specific value type implementations
that take care of the necessary type conversions.
Args:
reader (boxs.storage.Reader): The reader from which the value should be
read.
Returns:
Any: The value that was read from the reader.
"""
supports(self, value)
🔗
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value type. It is only necessary, if the value type should be picked up automatically. If it is only used explicitly, no check is performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value for which the value type should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in boxs/value_types.py
def supports(self, value): # pylint: disable=unused-argument,no-self-use
"""
Returns if the value type can be used for reading a writing the given value.
This method is used to determine if a value can be read and written by a value
type. It is only necessary, if the value type should be picked up
automatically. If it is only used explicitly, no check is performed.
Args:
value (Any): The value for which the value type should be checked.
Returns:
bool: `True` if the value type supports this value, otherwise `False`.
The default implementation just returns `False`.
"""
return False
write_value_to_writer(self, value, writer)
🔗
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations that take care of the necessary type conversions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value that should be written. |
required |
writer |
boxs.storage.Writer |
The writer into which the value should be written. |
required |
Source code in boxs/value_types.py
@abc.abstractmethod
def write_value_to_writer(self, value, writer):
"""
Write the given value to the writer.
This method needs to be implemented by the specific value type implementations
that take care of the necessary type conversions.
Args:
value (Any): The value that should be written.
writer (boxs.storage.Writer): The writer into which the value should be
written.
"""