Skip to content

API

Measure resource usage of your python code

measure 🔗

Contains Measure class

Measure 🔗

Class that represents a metric that is gathered on a subject.

Attributes:

Name Type Description
key str

The (unique) key that is used for referencing a measure. The key is derived from the keys of the subject and the metric in the form <subject-key>.<metric-key>.

subject multimeter.subject.Subject

The subject on which the measure gathers data.

metric multimeter.metric.Metric

The metric which is gathered.

measurement 🔗

Measuring run-time properties

Measurement 🔗

Class that is used for a single series of measurement.

Attributes:

Name Type Description
identifier str

The (unique) identifier for this series of measurement.

tags Dict[str,str]

A set of user-defined tags that can be used for later differentiation between different series.

result multimeter.result.Result

The result of the series of measurement containing the measured values.

__init__(self, multimeter, identifier, **tags) special 🔗

Creates a new measurement.

Parameters:

Name Type Description Default
multimeter multimeter.multimeter.Multimeter

The multimeter whose configuration should be used for this measurement.

required
identifier

The (unique) identifier for this series of measurement. If None, a unique identifier is generated from the current time.

required
**tags Dict[str,str]

A set of user-defined tags that can be used for later differentiation between different measurements.

{}
Source code in multimeter/measurement.py
def __init__(self, multimeter, identifier, **tags):
    """
    Creates a new measurement.

    Args:
        multimeter (multimeter.multimeter.Multimeter): The multimeter whose
            configuration should be used for this measurement.
        identifier: The (unique) identifier for this series of measurement. If
            `None`, a unique identifier is generated from the current time.
        **tags (Dict[str,str]): A set of user-defined tags that can be used
            for later differentiation between different measurements.
    """
    self.identifier = identifier or str(time.time())
    self.tags = tags
    self.result = None
    self._probes = multimeter.probes
    self._storage = multimeter.storage
    self._thread = _MeasuringThread(self._sample, cycle_time=multimeter.cycle_time)
    self._time_last_sample = None

add_mark(self, label) 🔗

Add a new mark for the current time.

Parameters:

Name Type Description Default
label str

The label of the mark.

required
Source code in multimeter/measurement.py
def add_mark(self, label):
    """
    Add a new mark for the current time.

    Args:
        label (str): The label of the mark.
    """
    timestamp = datetime.datetime.now(datetime.timezone.utc)
    self.result.add_mark(timestamp, label)

end(self) 🔗

Ends the process of gathering run-time property values.

Source code in multimeter/measurement.py
def end(self):
    """
    Ends the process of gathering run-time property values.

    """
    self._thread.end()
    self._thread.join()
    for probe in self._probes:
        probe.end()
    self._storage.store(self.result)

start(self) 🔗

Starts the process of gathering run-time property values.

Returns:

Type Description
multimeter.result.Result

The result of this series of measurement.

Source code in multimeter/measurement.py
def start(self):
    """
    Starts the process of gathering run-time property values.

    Returns:
        multimeter.result.Result: The result of this series of measurement.
    """
    self.result = Result(*self._probes, identifier=self.identifier, tags=self.tags)
    self._time_last_sample = time.monotonic()
    for probe in self._probes:
        probe.start()
    self._thread.start()
    return self.result

metric 🔗

Metrics that define the type of measured values

Metric 🔗

Class representing a metric describing the type of measured values.

Attributes:

Name Type Description
key str

The (unique) key that is used for referencing a metric.

description str

A human readable description of the metric. Defaults to the empty string "".

unit str

The unit in which the value of metric is expressed. Defaults to the empty string "".

value_type type

The python type used for the values of this metric. Defaults to float.

min_value value_type

The minimum value that this metric can produce. Defaults to 0.0, but can be set to None to disable limitation.

max_value value_type

The maximum value that this metric can produce. Defaults to None which disables limitation.

limit_value(self, value) 🔗

Make sure the value is in the limits of this metric.

If the given value is larger than max_value or smaller than min_value the value is capped.

Parameters:

Name Type Description Default
value value_type

The value tp limit.

required

Returns:

Type Description
value_type

The limited value.

Source code in multimeter/metric.py
def limit_value(self, value):
    """
    Make sure the value is in the limits of this metric.

    If the given value is larger than `max_value` or smaller than `min_value` the
    value is capped.

    Args:
        value (value_type): The value tp limit.

    Returns:
        value_type: The limited value.
    """
    if self.min_value is not None:
        value = max(self.min_value, value)
    if self.max_value is not None:
        value = min(self.max_value, value)
    return value

multimeter 🔗

The module defining the Multimeter class

Multimeter 🔗

A Multimeter object allows to measure run-time properties of python code.

The multi meter uses multimeter.probe.Probe objects that are used for actually measuring some metrics on individual subjects. Those probes are defined, when a new instance of Multimeter is created.

A new series of measurement is created when the method measure() is being called. The measuring process itself is started by calling start() and runs until it is ended by calling end(). The measuring happens in a different thread so that it runs in the background automatically. When the measurement is ended, its multimeter.result.Result can be retrieved from the measurement itself.

Examples:

>>> multimeter = Multimeter(ResourceProbe())
>>> measurement = multimeter.measure()
>>> measurement.start()
>>> here_my_code_to_be_measured()
...
>>> measurement.end()
>>> result = measurement.result

Alternatively, Measurement can be used as a context manager, that returns the result:

Examples:

>>> multimeter = Multimeter(ResourceProbe())
>>> with multimeter.measure() as result:
...     here_my_code_to_be_measured()
...

cycle_time property readonly 🔗

Returns the cycle time in seconds, how often data is gathered.

probes property readonly 🔗

Returns the probes that gather the data.

storage property readonly 🔗

Returns the storage.

__init__(self, *probes, *, cycle_time=1.0, storage=DummyStorage()) special 🔗

Create a new multi meter.

Parameters:

Name Type Description Default
*probes multimeter.probe.Probe

A list of probes that measure.

()
cycle_time float

The time in seconds between two measure points. Defaults to 1.0. which means measure every second.

1.0
storage multimeter.storage.Storage

A instance of Storage that takes care of storing the final result. Defaults to an implementation that doesn't do anything.

DummyStorage()
Source code in multimeter/multimeter.py
def __init__(self, *probes, cycle_time=1.0, storage=DummyStorage()):
    """
    Create a new multi meter.

    Args:
        *probes (multimeter.probe.Probe): A list of probes that measure.
        cycle_time (float): The time in seconds between two measure points.
            Defaults to `1.0`. which means measure every second.
        storage (multimeter.storage.Storage): A instance of `Storage` that takes
            care of storing the final result. Defaults to an implementation that
            doesn't do anything.
    """
    self._cycle_time = cycle_time
    self._probes = [*probes]
    self._storage = storage

add_probes(self, *probes) 🔗

Add probes that gather data.

Parameters:

Name Type Description Default
*probes multimeter.probe.Probe

Objects that implement the Probe protocol.

()

Returns:

Type Description
Multimeter

The multimeter.

Source code in multimeter/multimeter.py
def add_probes(self, *probes):
    """
    Add probes that gather data.

    Args:
        *probes (multimeter.probe.Probe): Objects that implement the Probe
            protocol.

    Returns:
        Multimeter: The multimeter.
    """
    self._probes.extend(probes)
    return self

measure(self, identifier=None, **tags) 🔗

Create a new measurement based on the multimeters configuration.

Parameters:

Name Type Description Default
identifier str

The (unique) identifier for this measurement.

None
**tags Dict[str,str]

Optional user-defined tags, that can be used later for identifying the measurement.

{}

Returns:

Type Description
multimeter.measurement.Measurement

The new measurement object which is used for starting and ending the process.

Source code in multimeter/multimeter.py
def measure(self, identifier=None, **tags):
    """
    Create a new measurement based on the multimeters configuration.

    Args:
        identifier (str): The (unique) identifier for this measurement.
        **tags (Dict[str,str]): Optional user-defined tags, that can be used
            later for identifying the measurement.

    Returns:
        multimeter.measurement.Measurement: The new measurement object which is
            used for starting and ending the process.
    """
    return Measurement(self, identifier, **tags)

set_cycle_time(self, cycle_time) 🔗

Sets the cycle time, how often data is gathered.

Parameters:

Name Type Description Default
cycle_time float

The time between two sequential data gatherings.

required

Returns:

Type Description
Multimeter

The multimeter.

Source code in multimeter/multimeter.py
def set_cycle_time(self, cycle_time):
    """
    Sets the cycle time, how often data is gathered.

    Args:
        cycle_time (float): The time between two sequential data gatherings.

    Returns:
        Multimeter: The multimeter.
    """
    self._cycle_time = cycle_time
    return self

set_storage(self, storage) 🔗

Sets the storage.

Parameters:

Name Type Description Default
storage multimeter.storage.Storage

The storage that stores the results.

required

Returns:

Type Description
Multimeter

The multimeter.

Source code in multimeter/multimeter.py
def set_storage(self, storage):
    """
    Sets the storage.

    Args:
        storage (multimeter.storage.Storage): The storage that stores the results.

    Returns:
        Multimeter: The multimeter.
    """
    self._storage = storage
    return self

probe 🔗

Contains the protocol for probes which gather the data

Probe (ABC) 🔗

Protocol that is used for gathering run-time properties.

measures property readonly 🔗

The measures that will be gathered.

Returns:

Type Description
tuple[multimeter.measure.Measure]

A tuple of measures from which data is gathered by this probe.

metrics property readonly 🔗

The metrics used by this probe.

Returns:

Type Description
tuple[multimeter.metric.Metric]

A tuple of metrics that are gathered by this probe.

subjects property readonly 🔗

The subjects from which measures will be gathered.

Returns:

Type Description
tuple[multimeter.subject.Subject]

A tuple of subjects from which data is gathered by this probe.

end(self) 🔗

Signals to the probe that the gathering process is ended.

Source code in multimeter/probe.py
def end(self):
    """
    Signals to the probe that the gathering process is ended.
    """

sample(self, values, time_elapsed) 🔗

Sample all measures of this probe and store their current values.

Parameters:

Name Type Description Default
values Dict[str,any]

The dictionary where the values should be stored. The probe is expected to add the values for every measure under their key.

required
time_elapsed float

Time elapsed in seconds since the last sampling.

required
Source code in multimeter/probe.py
@abc.abstractmethod
def sample(self, values, time_elapsed):
    """
    Sample all measures of this probe and store their current values.

    Args:
        values (Dict[str,any]): The dictionary where the values should be stored.
            The probe is expected to add the values for every measure under their
            key.
        time_elapsed (float): Time elapsed in seconds since the last sampling.
    """

start(self) 🔗

Signals to the probe that it should start the gathering process.

Source code in multimeter/probe.py
def start(self):
    """
    Signals to the probe that it should start the gathering process.
    """

ResourceProbe (Probe) 🔗

Probe which gathers data using the python resource module.

measures property readonly 🔗

The measures that will be gathered.

Returns:

Type Description
tuple[multimeter.measure.Measure]

A tuple of measures from which data is gathered by this probe.

metrics property readonly 🔗

The metrics used by this probe.

Returns:

Type Description
tuple[multimeter.metric.Metric]

A tuple of metrics that are gathered by this probe.

subjects property readonly 🔗

The subjects from which measures will be gathered.

Returns:

Type Description
tuple[multimeter.subject.Subject]

A tuple of subjects from which data is gathered by this probe.

sample(self, values, time_elapsed) 🔗

Sample all measures of this probe and store their current values.

Parameters:

Name Type Description Default
values Dict[str,any]

The dictionary where the values should be stored. The probe is expected to add the values for every measure under their key.

required
time_elapsed float

Time elapsed in seconds since the last sampling.

required
Source code in multimeter/probe.py
def sample(self, values, time_elapsed):
    if self._last_process_values is None or self._last_children_values is None:
        raise RuntimeError("Need to start before sampling.")

    usage = resource.getrusage(resource.RUSAGE_SELF)
    self._add_to_data(
        values, 'process.', usage, self._last_process_values, time_elapsed
    )
    self._last_process_values = usage
    usage = resource.getrusage(resource.RUSAGE_CHILDREN)
    self._add_to_data(
        values, 'children.', usage, self._last_children_values, time_elapsed
    )
    self._last_children_values = usage

start(self) 🔗

Signals to the probe that it should start the gathering process.

Source code in multimeter/probe.py
def start(self):
    self._last_process_values = resource.getrusage(resource.RUSAGE_SELF)
    self._last_children_values = resource.getrusage(resource.RUSAGE_CHILDREN)

result 🔗

Contains classes for representing the results of a measurement

Mark 🔗

Class that contains a mark for a specified time with an annotation.

Attributes:

Name Type Description
datetime datetime.datetime

The timestamp in UTC.

label str

The annotation of the mark.

Point 🔗

Class that contains the values of a single measurement point.

Attributes:

Name Type Description
datetime datetime.datetime

The timestamp in UTC.

values dict[string, any]

The

Result 🔗

Class representing results of a measurement.

Attributes:

Name Type Description
identifier str

A string, that identifies the measurement.

tags Dict[string,string]

A set of user-defined tags with arbitrary string values.

meta_data Dict[string,string]

A dictionary with meta data about the measurement.

schema multimeter.result.Schema

The schema of the values that are used in this result.

points tuple[multimeter.result.Point

A list of measurement points, which contain the measured values with their timestamp.

marks tuple[multimeter.result.Mark

A tuple of marks that were set at specific times during measuring.

duration property readonly 🔗

Returns:

Type Description
datetime.timedelta

The duration between first and last measurement or None if nothing was measured.

end property readonly 🔗

Returns:

Type Description
datetime.datetime

The timestamp of the last measurement or None if nothing was measured.

marks property readonly 🔗

Returns a tuple containing the marks

points property readonly 🔗

Returns a tuple containing the points

start property readonly 🔗

Returns:

Type Description
datetime.datetime

The timestamp of the first measurement or None if nothing was measured.

add_mark(self, timestamp, label) 🔗

Add a new mark for the given timestamp.

Parameters:

Name Type Description Default
timestamp datetime.datetime

The timestamp when the values were sampled.

required
label str

The label of the mark.

required
Source code in multimeter/result.py
def add_mark(self, timestamp, label):
    """
    Add a new mark for the given timestamp.

    Args:
        timestamp (datetime.datetime): The timestamp when the values were sampled.
        label (str): The label of the mark.
    """
    self._marks.append(Mark(timestamp, label))

add_meta_data(self, **meta_data) 🔗

Adds meta data to the result.

Setting a value for a key the second time, will overwrite the value set the first time.

Source code in multimeter/result.py
def add_meta_data(self, **meta_data):
    """
    Adds meta data to the result.

    Setting a value for a key the second time, will overwrite the value set the
    first time.

    Keyword Args:
        **meta_data: The set of meta data in form of keyword args that should be
            added to the result.
    """
    self.meta_data.update(**meta_data)

append(self, timestamp, values) 🔗

Add a new point with measured values to the result.

Parameters:

Name Type Description Default
timestamp datetime.datetime

The timestamp when the values were sampled.

required
values Dict[str,any]

The values.

required
Source code in multimeter/result.py
def append(self, timestamp, values):
    """
    Add a new point with measured values to the result.

    Args:
        timestamp (datetime.datetime): The timestamp when the values were sampled.
        values (Dict[str,any]): The values.
    """
    self._points.append(Point(timestamp, values))

values(self, key) 🔗

Returns all values of a specific measure.

Parameters:

Name Type Description Default
key str

Key of the measure for which all values should be returned.

required

Yields

any: A value of the type defined by the metric.

Source code in multimeter/result.py
def values(self, key):
    """
    Returns all values of a specific measure.

    Args:
        key (str): Key of the measure for which all values should be returned.

    Yields:
        any: A value of the type defined by the metric.
    """
    return (point.values[key] for point in self._points)

Schema 🔗

Class that represents the schema of the values in a result.

The schema consists of the used metrics, the subjects of the measures and the measures, which define which metric is measured on which subject.

Attributes:

Name Type Description
metrics tuple[multimeter.metric.Metric

A list of metrics which are measured.

subjects tuple[multimeter.subject.Subject

A list of subjects on which some metrics were measured.

measures tuple[multimeter.measure.Measure

A list of measures, which contains which metrics were measured on which subjects.

storages special 🔗

base 🔗

Base class Storage for measurement results

Storage (ABC) 🔗

Base class for implementing storages, which store the results of measurements.

store(self, result) 🔗

Store the result.

Parameters:

Name Type Description Default
result multimeter.result.Result

The result to be stored.

required
Source code in multimeter/storages/base.py
@abc.abstractmethod
def store(self, result):
    """
    Store the result.

    Args:
        result (multimeter.result.Result): The result to be stored.
    """

dummy 🔗

Storage for measurement results

DummyStorage (Storage) 🔗

Dummy Storage implementation

store(self, result) 🔗

Store the result.

Parameters:

Name Type Description Default
result multimeter.result.Result

The result to be stored.

required
Source code in multimeter/storages/dummy.py
def store(self, result):
    pass

file 🔗

Storage that stores results in files

FileFormat (ABC) 🔗

Abstract base class for the different file format implementations.

extension property readonly 🔗

Returns the file extensions for the file, e.g. '.json'.

save_result_to_stream(self, result, stream) 🔗

Saves the result to a binary stream.

Parameters:

Name Type Description Default
result multimeter.result.Result

The result.

required
stream io.RawIOBase

Binary stream where the result is written to.

required
Source code in multimeter/storages/file.py
@abc.abstractmethod
def save_result_to_stream(self, result, stream):
    """
    Saves the result to a binary stream.

    Args:
        result (multimeter.result.Result): The result.
        stream (io.RawIOBase): Binary stream where the result is written to.
    """

FileStorage (Storage) 🔗

Storage implementation that stores results as JSON files in a directory.

Attributes:

Name Type Description
save_directory pathlib.Path

The path to the directory where the results are stored.

file_format multimeter.storages.file.FileFormat

The file format to use.

save_directory property readonly 🔗

Returns the save directory.

__init__(self, save_directory, file_format) special 🔗

Creates a new file storage.

Parameters:

Name Type Description Default
save_directory Union[str,pathlib.Path]

The path to a directory, where the json files will be stored.

required
file_format FileFormat

The format to use for the file.

required
Source code in multimeter/storages/file.py
def __init__(self, save_directory, file_format):
    """
    Creates a new file storage.

    Args:
        save_directory (Union[str,pathlib.Path]): The path to a directory, where
            the json files will be stored.
        file_format (FileFormat): The format to use for the file.
    """
    if save_directory is None:
        raise ValueError("'save_directory' must be set.")
    if not isinstance(save_directory, pathlib.Path):
        save_directory = pathlib.Path(save_directory)
    self._save_directory = save_directory
    self._file_format = file_format
store(self, result) 🔗

Store the result.

Parameters:

Name Type Description Default
result multimeter.result.Result

The result to be stored.

required
Source code in multimeter/storages/file.py
def store(self, result):
    if not self._save_directory.exists():
        self._save_directory.mkdir(parents=True)
    result_file_path = self._save_directory / (
        result.identifier + self._file_format.extension
    )
    with open(result_file_path, 'bw') as stream:
        self._file_format.save_result_to_stream(result, stream)

JsonFormat (FileFormat) 🔗

Format implementation that stores results as JSON.

extension property readonly 🔗

Returns the file extensions for the file, e.g. '.json'.

save_result_to_stream(self, result, stream) 🔗

Saves the result to a (binary) stream.

Parameters:

Name Type Description Default
result multimeter.result.Result

The result to save.

required
stream io.RawIOBase

The binary stream into which the result data should be written. The stream is not automatically closed.

required
Source code in multimeter/storages/file.py
def save_result_to_stream(self, result, stream):
    """
    Saves the result to a (binary) stream.

    Args:
        result (multimeter.result.Result): The result to save.
        stream (io.RawIOBase): The binary stream into which the result data should
            be written. The stream is not automatically closed.
    """
    text_stream = io.TextIOWrapper(stream, encoding='utf-8')

    metrics_json = _SerializableGenerator(
        {
            'key': o.key,
            'description': o.description,
            'unit': o.unit,
            'value_type': o.value_type.__name__,
            'min_value': o.min_value,
            'max_value': o.max_value,
        }
        for o in result.schema.metrics
    )
    subjects_json = _SerializableGenerator(
        {
            'key': o.key,
            'description': o.description,
        }
        for o in result.schema.subjects
    )
    measures_json = _SerializableGenerator(
        {
            'key': o.key,
            'subject': o.subject.key,
            'metric': o.metric.key,
        }
        for o in result.schema.measures
    )
    points_json = _SerializableGenerator(
        {
            'datetime': o.datetime.isoformat(timespec='milliseconds'),
            'values': o.values,
        }
        for o in result.points
    )
    marks_json = _SerializableGenerator(
        {
            'datetime': o.datetime.isoformat(timespec='milliseconds'),
            'label': o.label,
        }
        for o in result.marks
    )
    result_json = {
        'identifier': result.identifier,
        'tags': result.tags,
        'meta_data': result.meta_data,
        'schema': {
            'metrics': metrics_json,
            'subjects': subjects_json,
            'measures': measures_json,
        },
        'points': points_json,
        'marks': marks_json,
    }
    json.dump(result_json, text_stream, indent=2)
    text_stream.detach()

LineFormat (FileFormat) 🔗

Format implementation that stores results as Line format.

The format is used by time series databases like InfluxDB. For more information: https://docs.influxdata.com/influxdb/v2.1/reference/syntax/line-protocol/#string

extension property readonly 🔗

Returns the file extensions for the file, e.g. '.json'.

save_result_to_stream(self, result, stream) 🔗

Saves the result to a (binary) stream.

Parameters:

Name Type Description Default
result multimeter.result.Result

The result to save.

required
stream io.RawIOBase

The binary stream into which the result data should be written. The stream is not automatically closed.

required
Source code in multimeter/storages/file.py
def save_result_to_stream(self, result, stream):
    """
    Saves the result to a (binary) stream.

    Args:
        result (multimeter.result.Result): The result to save.
        stream (io.RawIOBase): The binary stream into which the result data should
            be written. The stream is not automatically closed.
    """
    self._check_measurement_identifier(result.identifier)
    text_stream = io.TextIOWrapper(stream, encoding='utf-8')
    tags = ''.join(
        f",{self._escape_tag(key)}={self._escape_tag(value)}"
        for key, value in result.tags.items()
    )
    identifier = self._escape_identifier(result.identifier)
    for point in result.points:
        values = ','.join(
            f'{self._escape_field_key(key)}={self._value_as_string(value)}'
            for key, value in point.values.items()
        )
        # convert from seconds to nano
        timestamp = int(point.datetime.timestamp() * 1000 * 1000 * 1000)
        text_stream.write(f'{identifier}{tags} {values} {timestamp}\n')

    text_stream.detach()

influx 🔗

Store measurements in an InfluxDB

InfluxDBStorage (Storage) 🔗

Storage implementation that stores results in InfluxDB.

The identifier of the result is used as the "measurement". The results from the measured are mapped to InfluxDB fields and tags are supported as well. The constructor of the storage allows to configure, under which org and in which bucket the data will be stored. Which InfluxDB to use is configured by the url argument, authorization is done using an authorization token, that has to be generated from the InfluxDB server.

__init__(self, token, url='http://localhost:8086', org='kantai', bucket='multimeter') special 🔗

Creates a new InfluxDB storage.

Parameters:

Name Type Description Default
token str

The authentication token for InfluxDB.

required
url url

The URL to the InfluxDB instance. Defaults to 'http://localhost:8086'.

'http://localhost:8086'
org str

The id of the organization which should own the data. Defaults to 'kantai'.

'kantai'
bucket str

The bucket where the data will be stored. Defaults to 'multimeter'.

'multimeter'
Source code in multimeter/storages/influx.py
def __init__(
    self,
    token,
    url="http://localhost:8086",
    org="kantai",
    bucket="multimeter",
):
    """
    Creates a new InfluxDB storage.

    Args:
        token (str): The authentication token for InfluxDB.
        url (url): The URL to the InfluxDB instance. Defaults to
            'http://localhost:8086'.
        org (str): The id of the organization which should own the data. Defaults
            to 'kantai'.
        bucket (str): The bucket where the data will be stored. Defaults to
            'multimeter'.
    """
    if token is None:
        raise ValueError("'token' must be set.")
    self._bucket = bucket
    self._client = influxdb_client.InfluxDBClient(url, token, org=org)
    logger.info("Created influxdb client with url '%s', org '%s'.", url, org)
store(self, result) 🔗

Store the result.

Parameters:

Name Type Description Default
result multimeter.result.Result

The result to be stored.

required
Source code in multimeter/storages/influx.py
def store(self, result):
    def create_influxdb_point(identifier, point):
        influx_point = influxdb_client.Point(identifier).time(point.datetime)
        for field, value in point.values.items():
            influx_point.field(field, value)
        return influx_point

    point_settings = influxdb_client.client.write_api.PointSettings()
    for label, value in result.tags.items():
        point_settings.add_default_tag(label, value)

    write_client = self._client.write_api(
        write_options=influxdb_client.client.write_api.SYNCHRONOUS,
        point_settings=point_settings,
    )

    records = (
        create_influxdb_point(result.identifier, point) for point in result.points
    )
    logger.info("Write new measurement %s to data base", result.identifier)
    write_client.write(bucket=self._bucket, record=records)

subject 🔗

Types for representing subjects

Subject 🔗

Class representing a measurement subject.

Attributes:

Name Type Description
key str

The key how the subject is referenced in measures.

description str

Description of the subject.

visualization 🔗

Command line tool for setting up a docker based visualization

install(args) 🔗

Install the visualization container and configure them.

Parameters:

Name Type Description Default
args argparse.Namespace

The arguments parsed from command-line.

required
Source code in multimeter/visualization.py
def install(args):  # pylint: disable=too-many-locals
    """
    Install the visualization container and configure them.

    Args:
        args (argparse.Namespace): The arguments parsed from command-line.
    """

    config_file_path = pathlib.Path(args.config_file)
    if config_file_path.exists():
        logger.error("Visualization config already found at %s", args.config_file)
        sys.exit(-2)

    data_directory = pathlib.Path(args.data_dir)

    multimeter_directory = pathlib.Path(__file__).parent
    visu_resources_directory = multimeter_directory / 'visualization'
    grafana_resources_directory = visu_resources_directory / 'grafana'
    grafana_provisioning_directory = grafana_resources_directory / 'provisioning'
    grafana_dashboards_templates_directory = grafana_resources_directory / 'dashboards'

    influx_resources_directory = visu_resources_directory / 'influxdb'

    org = args.org
    data_bucket = args.bucket

    data_directory.mkdir(parents=True, exist_ok=True)
    influx_dir = data_directory / 'influxdb'
    influx_dir.mkdir(exist_ok=True)
    influx_data_dir = influx_dir / 'data'
    influx_data_dir.mkdir(exist_ok=True)
    influx_config_dir = influx_dir / 'config'
    influx_config_dir.mkdir(exist_ok=True)

    grafana_dir = data_directory / 'grafana'

    new_env = {
        'UID': str(os.getuid()),
        'DATA_DIR': str(data_directory.absolute()),
        'INFLUX_ADMIN': 'admin',
        'INFLUX_PASSWORD': 'admin123',
        'INFLUX_ORG': org,
        'INFLUX_BUCKET': data_bucket,
        'GRAFANA_PROVISIONING_DIR': str(grafana_provisioning_directory.absolute()),
    }

    # Starting only influxdb
    logger.info("Create and start influxdb...")
    _execute_docker_command('up', '-d', 'influxdb2', env=new_env)

    logger.info("Waiting for influxdb to come up...")
    time.sleep(5)

    #  create authorization token
    logger.info("Create influxdb authorization token for grafana")
    output = _execute_docker_command(
        'exec',
        'influxdb2',
        'influx',
        'auth',
        'create',
        '-o',
        org,
        '-d',
        '"Token for Grafana access"',
        '--all-access',
        '--json',
        env=new_env,
    )
    json_result = json.loads(output)
    token = json_result['token']
    logger.debug("Created token %s", token)
    new_env['GRAFANA_INFLUX_DATA_SOURCE_TOKEN'] = token

    #  load example data
    logger.info("Load example data into influxdb")
    _load_example_data(influx_data_dir, influx_resources_directory)
    _execute_docker_command(
        'exec',
        'influxdb2',
        'influx',
        'write',
        '-o',
        org,
        '-b',
        data_bucket,
        '-t',
        token,
        '-f',
        '/var/lib/influxdb2/example.line',
        env=new_env,
    )

    _copy_dashboards_from_templates(
        data_bucket, grafana_dashboards_templates_directory, grafana_dir
    )

    logger.info("Create and start grafana...")
    _execute_docker_command('up', '-d', 'grafana8', env=new_env)

    _save_visu_config(config_file_path, new_env)

    _print_instructions()

main(args=None) 🔗

Main function for the visualization tool.

Source code in multimeter/visualization.py
def main(args=None):
    """
    Main function for the visualization tool.
    """
    multimeter_dir = pathlib.Path.home() / '.multimeter'
    multimeter_dir.mkdir(exist_ok=True)
    stdout_handler = logging.StreamHandler(sys.stdout)
    stdout_handler.setFormatter(logging.Formatter(fmt='%(message)s'))
    stdout_handler.level = logging.INFO
    file_handler = logging.FileHandler(multimeter_dir / 'visu.log')
    file_handler.level = logging.DEBUG
    file_handler.setFormatter(
        logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    )
    logging.basicConfig(
        level=logging.DEBUG,
        handlers=[stdout_handler, file_handler],
    )

    logger.debug("Command line arguments: %s", args)

    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )
    parser.set_defaults(command=lambda _: parser.print_help())
    parser.add_argument(
        '-c',
        '--config',
        dest='config_file',
        help="The file where the config for the visualization is stored.",
        default=multimeter_dir / 'visu.config.json',
    )
    subparsers = parser.add_subparsers(help='sub-command help')
    install_parser = subparsers.add_parser(
        'install', help="Create the necessary docker container and configure them"
    )
    install_parser.add_argument(
        '-d',
        '--dir',
        dest='data_dir',
        help="The directory where the data will be stored.",
        default=multimeter_dir / 'sdocker-data',
    )
    install_parser.add_argument(
        '-o', '--org', dest='org', help="The organization in influx.", default='kantai'
    )
    install_parser.add_argument(
        '-b',
        '--bucket',
        dest='bucket',
        help="The bucket in influx for storing measurements.",
        default='multimeter',
    )
    install_parser.set_defaults(command=install)

    remove_parser = subparsers.add_parser(
        'remove', help="Remove docker containers and remove the stored data"
    )
    remove_parser.set_defaults(command=remove)

    start_parser = subparsers.add_parser('start', help="Start the visualization")
    start_parser.set_defaults(command=start)

    stop_parser = subparsers.add_parser('stop', help="Stop the visualization")
    stop_parser.set_defaults(command=stop)

    _check_docker_compose_available()

    args = parser.parse_args(args)
    args.command(args)

remove(args) 🔗

Remove the visualization container and delete the data stored for the visu.

Parameters:

Name Type Description Default
args argparse.Namespace

The arguments parsed from command-line.

required
Source code in multimeter/visualization.py
def remove(args):
    """
    Remove the visualization container and delete the data stored for the visu.

    Args:
        args (argparse.Namespace): The arguments parsed from command-line.
    """

    config_file_path = pathlib.Path(args.config_file)
    if not config_file_path.exists():
        logger.error("No config file found at %s", args.config_file)
        sys.exit(-3)

    with open(config_file_path, 'r', encoding='utf-8') as stream:
        environment = json.load(stream)['environment']

    logger.info("Remove docker containers...")
    _execute_docker_command('down', env=environment)

    logger.info("Remove data directory %s", str(environment['DATA_DIR']))
    shutil.rmtree(environment['DATA_DIR'])

    logger.info("Remove visualization config in %s", str(config_file_path))
    config_file_path.unlink()

start(args) 🔗

Start the visualization container.

Parameters:

Name Type Description Default
args argparse.Namespace

The arguments parsed from command-line.

required
Source code in multimeter/visualization.py
def start(args):
    """
    Start the visualization container.

    Args:
        args (argparse.Namespace): The arguments parsed from command-line.
    """

    config_file_path = pathlib.Path(args.config_file)
    if not config_file_path.exists():
        logger.error("No config file found at %s", args.config_file)
        sys.exit(-3)

    with open(config_file_path, 'r', encoding='utf-8') as stream:
        environment = json.load(stream)['environment']

    logger.info("Starting docker containers...")
    _execute_docker_command('start', env=environment)

stop(args) 🔗

Stop the visualization container.

Parameters:

Name Type Description Default
args argparse.Namespace

The arguments parsed from command-line.

required
Source code in multimeter/visualization.py
def stop(args):
    """
    Stop the visualization container.

    Args:
        args (argparse.Namespace): The arguments parsed from command-line.
    """

    config_file_path = pathlib.Path(args.config_file)
    if not config_file_path.exists():
        logger.error("No config file found at %s", args.config_file)
        sys.exit(-3)

    with open(config_file_path, 'r', encoding='utf-8') as stream:
        environment = json.load(stream)['environment']

    logger.info("Stopping docker containers...")
    _execute_docker_command('stop', env=environment)

Last update: 2021-11-28