API
Measure resource usage of your python code
measure
🔗
Contains Measure class
Measure
🔗
Class that represents a metric that is gathered on a subject.
Attributes:
Name | Type | Description |
---|---|---|
key |
str |
The (unique) key that is used for referencing a measure. The key is
derived from the keys of the subject and the metric in the form
|
subject |
multimeter.subject.Subject |
The subject on which the measure gathers data. |
metric |
multimeter.metric.Metric |
The metric which is gathered. |
measurement
🔗
Measuring run-time properties
Measurement
🔗
Class that is used for a single series of measurement.
Attributes:
Name | Type | Description |
---|---|---|
identifier |
str |
The (unique) identifier for this series of measurement. |
tags |
Dict[str,str] |
A set of user-defined tags that can be used for later differentiation between different series. |
result |
multimeter.result.Result |
The result of the series of measurement containing the measured values. |
__init__(self, multimeter, identifier, **tags)
special
🔗
Creates a new measurement.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
multimeter |
multimeter.multimeter.Multimeter |
The multimeter whose configuration should be used for this measurement. |
required |
identifier |
The (unique) identifier for this series of measurement. If
|
required | |
**tags |
Dict[str,str] |
A set of user-defined tags that can be used for later differentiation between different measurements. |
{} |
Source code in multimeter/measurement.py
def __init__(self, multimeter, identifier, **tags):
"""
Creates a new measurement.
Args:
multimeter (multimeter.multimeter.Multimeter): The multimeter whose
configuration should be used for this measurement.
identifier: The (unique) identifier for this series of measurement. If
`None`, a unique identifier is generated from the current time.
**tags (Dict[str,str]): A set of user-defined tags that can be used
for later differentiation between different measurements.
"""
self.identifier = identifier or str(time.time())
self.tags = tags
self.result = None
self._probes = multimeter.probes
self._storage = multimeter.storage
self._thread = _MeasuringThread(self._sample, cycle_time=multimeter.cycle_time)
self._time_last_sample = None
add_mark(self, label)
🔗
Add a new mark for the current time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
label |
str |
The label of the mark. |
required |
Source code in multimeter/measurement.py
def add_mark(self, label):
"""
Add a new mark for the current time.
Args:
label (str): The label of the mark.
"""
timestamp = datetime.datetime.now(datetime.timezone.utc)
self.result.add_mark(timestamp, label)
end(self)
🔗
Ends the process of gathering run-time property values.
Source code in multimeter/measurement.py
def end(self):
"""
Ends the process of gathering run-time property values.
"""
self._thread.end()
self._thread.join()
for probe in self._probes:
probe.end()
self._storage.store(self.result)
start(self)
🔗
Starts the process of gathering run-time property values.
Returns:
Type | Description |
---|---|
multimeter.result.Result |
The result of this series of measurement. |
Source code in multimeter/measurement.py
def start(self):
"""
Starts the process of gathering run-time property values.
Returns:
multimeter.result.Result: The result of this series of measurement.
"""
self.result = Result(*self._probes, identifier=self.identifier, tags=self.tags)
self._time_last_sample = time.monotonic()
for probe in self._probes:
probe.start()
self._thread.start()
return self.result
metric
🔗
Metrics that define the type of measured values
Metric
🔗
Class representing a metric describing the type of measured values.
Attributes:
Name | Type | Description |
---|---|---|
key |
str |
The (unique) key that is used for referencing a metric. |
description |
str |
A human readable description of the metric. Defaults to the
empty string |
unit |
str |
The unit in which the value of metric is expressed. Defaults to
the empty string |
value_type |
type |
The python type used for the values of this metric.
Defaults to |
min_value |
value_type |
The minimum value that this metric can produce.
Defaults to |
max_value |
value_type |
The maximum value that this metric can produce.
Defaults to |
limit_value(self, value)
🔗
Make sure the value is in the limits of this metric.
If the given value is larger than max_value
or smaller than min_value
the
value is capped.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
value_type |
The value tp limit. |
required |
Returns:
Type | Description |
---|---|
value_type |
The limited value. |
Source code in multimeter/metric.py
def limit_value(self, value):
"""
Make sure the value is in the limits of this metric.
If the given value is larger than `max_value` or smaller than `min_value` the
value is capped.
Args:
value (value_type): The value tp limit.
Returns:
value_type: The limited value.
"""
if self.min_value is not None:
value = max(self.min_value, value)
if self.max_value is not None:
value = min(self.max_value, value)
return value
multimeter
🔗
The module defining the Multimeter class
Multimeter
🔗
A Multimeter
object allows to measure run-time properties of python code.
The multi meter uses multimeter.probe.Probe
objects that are used for actually
measuring some metrics on individual subjects. Those probes are defined, when a
new instance of Multimeter
is created.
A new series of measurement is created when the method measure()
is being called.
The measuring process itself is started by calling start()
and runs until it is
ended by calling end()
. The measuring happens in a different thread so that it
runs in the background automatically. When the measurement is ended, its
multimeter.result.Result
can be retrieved from the measurement itself.
Examples:
>>> multimeter = Multimeter(ResourceProbe())
>>> measurement = multimeter.measure()
>>> measurement.start()
>>> here_my_code_to_be_measured()
...
>>> measurement.end()
>>> result = measurement.result
Alternatively, Measurement
can be used as a context manager, that returns the
result:
Examples:
>>> multimeter = Multimeter(ResourceProbe())
>>> with multimeter.measure() as result:
... here_my_code_to_be_measured()
...
cycle_time
property
readonly
🔗
Returns the cycle time in seconds, how often data is gathered.
probes
property
readonly
🔗
Returns the probes that gather the data.
storage
property
readonly
🔗
Returns the storage.
__init__(self, *probes, *, cycle_time=1.0, storage=DummyStorage())
special
🔗
Create a new multi meter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*probes |
multimeter.probe.Probe |
A list of probes that measure. |
() |
cycle_time |
float |
The time in seconds between two measure points.
Defaults to |
1.0 |
storage |
multimeter.storage.Storage |
A instance of |
DummyStorage() |
Source code in multimeter/multimeter.py
def __init__(self, *probes, cycle_time=1.0, storage=DummyStorage()):
"""
Create a new multi meter.
Args:
*probes (multimeter.probe.Probe): A list of probes that measure.
cycle_time (float): The time in seconds between two measure points.
Defaults to `1.0`. which means measure every second.
storage (multimeter.storage.Storage): A instance of `Storage` that takes
care of storing the final result. Defaults to an implementation that
doesn't do anything.
"""
self._cycle_time = cycle_time
self._probes = [*probes]
self._storage = storage
add_probes(self, *probes)
🔗
Add probes that gather data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*probes |
multimeter.probe.Probe |
Objects that implement the Probe protocol. |
() |
Returns:
Type | Description |
---|---|
Multimeter |
The multimeter. |
Source code in multimeter/multimeter.py
def add_probes(self, *probes):
"""
Add probes that gather data.
Args:
*probes (multimeter.probe.Probe): Objects that implement the Probe
protocol.
Returns:
Multimeter: The multimeter.
"""
self._probes.extend(probes)
return self
measure(self, identifier=None, **tags)
🔗
Create a new measurement based on the multimeters configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
identifier |
str |
The (unique) identifier for this measurement. |
None |
**tags |
Dict[str,str] |
Optional user-defined tags, that can be used later for identifying the measurement. |
{} |
Returns:
Type | Description |
---|---|
multimeter.measurement.Measurement |
The new measurement object which is used for starting and ending the process. |
Source code in multimeter/multimeter.py
def measure(self, identifier=None, **tags):
"""
Create a new measurement based on the multimeters configuration.
Args:
identifier (str): The (unique) identifier for this measurement.
**tags (Dict[str,str]): Optional user-defined tags, that can be used
later for identifying the measurement.
Returns:
multimeter.measurement.Measurement: The new measurement object which is
used for starting and ending the process.
"""
return Measurement(self, identifier, **tags)
set_cycle_time(self, cycle_time)
🔗
Sets the cycle time, how often data is gathered.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cycle_time |
float |
The time between two sequential data gatherings. |
required |
Returns:
Type | Description |
---|---|
Multimeter |
The multimeter. |
Source code in multimeter/multimeter.py
def set_cycle_time(self, cycle_time):
"""
Sets the cycle time, how often data is gathered.
Args:
cycle_time (float): The time between two sequential data gatherings.
Returns:
Multimeter: The multimeter.
"""
self._cycle_time = cycle_time
return self
set_storage(self, storage)
🔗
Sets the storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
storage |
multimeter.storage.Storage |
The storage that stores the results. |
required |
Returns:
Type | Description |
---|---|
Multimeter |
The multimeter. |
Source code in multimeter/multimeter.py
def set_storage(self, storage):
"""
Sets the storage.
Args:
storage (multimeter.storage.Storage): The storage that stores the results.
Returns:
Multimeter: The multimeter.
"""
self._storage = storage
return self
probe
🔗
Contains the protocol for probes which gather the data
Probe (ABC)
🔗
Protocol that is used for gathering run-time properties.
measures
property
readonly
🔗
The measures that will be gathered.
Returns:
Type | Description |
---|---|
tuple[multimeter.measure.Measure] |
A tuple of measures from which data is gathered by this probe. |
metrics
property
readonly
🔗
The metrics used by this probe.
Returns:
Type | Description |
---|---|
tuple[multimeter.metric.Metric] |
A tuple of metrics that are gathered by this probe. |
subjects
property
readonly
🔗
The subjects from which measures will be gathered.
Returns:
Type | Description |
---|---|
tuple[multimeter.subject.Subject] |
A tuple of subjects from which data is gathered by this probe. |
end(self)
🔗
Signals to the probe that the gathering process is ended.
Source code in multimeter/probe.py
def end(self):
"""
Signals to the probe that the gathering process is ended.
"""
sample(self, values, time_elapsed)
🔗
Sample all measures of this probe and store their current values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str,any] |
The dictionary where the values should be stored. The probe is expected to add the values for every measure under their key. |
required |
time_elapsed |
float |
Time elapsed in seconds since the last sampling. |
required |
Source code in multimeter/probe.py
@abc.abstractmethod
def sample(self, values, time_elapsed):
"""
Sample all measures of this probe and store their current values.
Args:
values (Dict[str,any]): The dictionary where the values should be stored.
The probe is expected to add the values for every measure under their
key.
time_elapsed (float): Time elapsed in seconds since the last sampling.
"""
start(self)
🔗
Signals to the probe that it should start the gathering process.
Source code in multimeter/probe.py
def start(self):
"""
Signals to the probe that it should start the gathering process.
"""
ResourceProbe (Probe)
🔗
Probe which gathers data using the python resource
module.
measures
property
readonly
🔗
The measures that will be gathered.
Returns:
Type | Description |
---|---|
tuple[multimeter.measure.Measure] |
A tuple of measures from which data is gathered by this probe. |
metrics
property
readonly
🔗
The metrics used by this probe.
Returns:
Type | Description |
---|---|
tuple[multimeter.metric.Metric] |
A tuple of metrics that are gathered by this probe. |
subjects
property
readonly
🔗
The subjects from which measures will be gathered.
Returns:
Type | Description |
---|---|
tuple[multimeter.subject.Subject] |
A tuple of subjects from which data is gathered by this probe. |
sample(self, values, time_elapsed)
🔗
Sample all measures of this probe and store their current values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str,any] |
The dictionary where the values should be stored. The probe is expected to add the values for every measure under their key. |
required |
time_elapsed |
float |
Time elapsed in seconds since the last sampling. |
required |
Source code in multimeter/probe.py
def sample(self, values, time_elapsed):
if self._last_process_values is None or self._last_children_values is None:
raise RuntimeError("Need to start before sampling.")
usage = resource.getrusage(resource.RUSAGE_SELF)
self._add_to_data(
values, 'process.', usage, self._last_process_values, time_elapsed
)
self._last_process_values = usage
usage = resource.getrusage(resource.RUSAGE_CHILDREN)
self._add_to_data(
values, 'children.', usage, self._last_children_values, time_elapsed
)
self._last_children_values = usage
start(self)
🔗
Signals to the probe that it should start the gathering process.
Source code in multimeter/probe.py
def start(self):
self._last_process_values = resource.getrusage(resource.RUSAGE_SELF)
self._last_children_values = resource.getrusage(resource.RUSAGE_CHILDREN)
result
🔗
Contains classes for representing the results of a measurement
Mark
🔗
Class that contains a mark for a specified time with an annotation.
Attributes:
Name | Type | Description |
---|---|---|
datetime |
datetime.datetime |
The timestamp in UTC. |
label |
str |
The annotation of the mark. |
Point
🔗
Class that contains the values of a single measurement point.
Attributes:
Name | Type | Description |
---|---|---|
datetime |
datetime.datetime |
The timestamp in UTC. |
values |
dict[string, any] |
The |
Result
🔗
Class representing results of a measurement.
Attributes:
Name | Type | Description |
---|---|---|
identifier |
str |
A string, that identifies the measurement. |
tags |
Dict[string,string] |
A set of user-defined tags with arbitrary string values. |
meta_data |
Dict[string,string] |
A dictionary with meta data about the measurement. |
schema |
multimeter.result.Schema |
The schema of the values that are used in this result. |
points |
tuple[multimeter.result.Point |
A list of measurement points, which contain the measured values with their timestamp. |
marks |
tuple[multimeter.result.Mark |
A tuple of marks that were set at specific times during measuring. |
duration
property
readonly
🔗
Returns:
Type | Description |
---|---|
datetime.timedelta |
The duration between first and last measurement or
|
end
property
readonly
🔗
Returns:
Type | Description |
---|---|
datetime.datetime |
The timestamp of the last measurement or |
marks
property
readonly
🔗
Returns a tuple containing the marks
points
property
readonly
🔗
Returns a tuple containing the points
start
property
readonly
🔗
Returns:
Type | Description |
---|---|
datetime.datetime |
The timestamp of the first measurement or |
add_mark(self, timestamp, label)
🔗
Add a new mark for the given timestamp.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timestamp |
datetime.datetime |
The timestamp when the values were sampled. |
required |
label |
str |
The label of the mark. |
required |
Source code in multimeter/result.py
def add_mark(self, timestamp, label):
"""
Add a new mark for the given timestamp.
Args:
timestamp (datetime.datetime): The timestamp when the values were sampled.
label (str): The label of the mark.
"""
self._marks.append(Mark(timestamp, label))
add_meta_data(self, **meta_data)
🔗
Adds meta data to the result.
Setting a value for a key the second time, will overwrite the value set the first time.
Source code in multimeter/result.py
def add_meta_data(self, **meta_data):
"""
Adds meta data to the result.
Setting a value for a key the second time, will overwrite the value set the
first time.
Keyword Args:
**meta_data: The set of meta data in form of keyword args that should be
added to the result.
"""
self.meta_data.update(**meta_data)
append(self, timestamp, values)
🔗
Add a new point with measured values to the result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timestamp |
datetime.datetime |
The timestamp when the values were sampled. |
required |
values |
Dict[str,any] |
The values. |
required |
Source code in multimeter/result.py
def append(self, timestamp, values):
"""
Add a new point with measured values to the result.
Args:
timestamp (datetime.datetime): The timestamp when the values were sampled.
values (Dict[str,any]): The values.
"""
self._points.append(Point(timestamp, values))
values(self, key)
🔗
Returns all values of a specific measure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
Key of the measure for which all values should be returned. |
required |
Yields
any: A value of the type defined by the metric.
Source code in multimeter/result.py
def values(self, key):
"""
Returns all values of a specific measure.
Args:
key (str): Key of the measure for which all values should be returned.
Yields:
any: A value of the type defined by the metric.
"""
return (point.values[key] for point in self._points)
Schema
🔗
Class that represents the schema of the values in a result.
The schema consists of the used metrics, the subjects of the measures and the measures, which define which metric is measured on which subject.
Attributes:
Name | Type | Description |
---|---|---|
metrics |
tuple[multimeter.metric.Metric |
A list of metrics which are measured. |
subjects |
tuple[multimeter.subject.Subject |
A list of subjects on which some metrics were measured. |
measures |
tuple[multimeter.measure.Measure |
A list of measures, which contains which metrics were measured on which subjects. |
storages
special
🔗
base
🔗
Base class Storage for measurement results
Storage (ABC)
🔗
Base class for implementing storages, which store the results of measurements.
store(self, result)
🔗
Store the result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
multimeter.result.Result |
The result to be stored. |
required |
Source code in multimeter/storages/base.py
@abc.abstractmethod
def store(self, result):
"""
Store the result.
Args:
result (multimeter.result.Result): The result to be stored.
"""
dummy
🔗
Storage for measurement results
file
🔗
Storage that stores results in files
FileFormat (ABC)
🔗
Abstract base class for the different file format implementations.
extension
property
readonly
🔗
Returns the file extensions for the file, e.g. '.json'.
save_result_to_stream(self, result, stream)
🔗
Saves the result to a binary stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
multimeter.result.Result |
The result. |
required |
stream |
io.RawIOBase |
Binary stream where the result is written to. |
required |
Source code in multimeter/storages/file.py
@abc.abstractmethod
def save_result_to_stream(self, result, stream):
"""
Saves the result to a binary stream.
Args:
result (multimeter.result.Result): The result.
stream (io.RawIOBase): Binary stream where the result is written to.
"""
FileStorage (Storage)
🔗
Storage implementation that stores results as JSON files in a directory.
Attributes:
Name | Type | Description |
---|---|---|
save_directory |
pathlib.Path |
The path to the directory where the results are stored. |
file_format |
multimeter.storages.file.FileFormat |
The file format to use. |
save_directory
property
readonly
🔗
Returns the save directory.
__init__(self, save_directory, file_format)
special
🔗
Creates a new file storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
save_directory |
Union[str,pathlib.Path] |
The path to a directory, where the json files will be stored. |
required |
file_format |
FileFormat |
The format to use for the file. |
required |
Source code in multimeter/storages/file.py
def __init__(self, save_directory, file_format):
"""
Creates a new file storage.
Args:
save_directory (Union[str,pathlib.Path]): The path to a directory, where
the json files will be stored.
file_format (FileFormat): The format to use for the file.
"""
if save_directory is None:
raise ValueError("'save_directory' must be set.")
if not isinstance(save_directory, pathlib.Path):
save_directory = pathlib.Path(save_directory)
self._save_directory = save_directory
self._file_format = file_format
store(self, result)
🔗
Store the result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
multimeter.result.Result |
The result to be stored. |
required |
Source code in multimeter/storages/file.py
def store(self, result):
if not self._save_directory.exists():
self._save_directory.mkdir(parents=True)
result_file_path = self._save_directory / (
result.identifier + self._file_format.extension
)
with open(result_file_path, 'bw') as stream:
self._file_format.save_result_to_stream(result, stream)
JsonFormat (FileFormat)
🔗
Format implementation that stores results as JSON.
extension
property
readonly
🔗
Returns the file extensions for the file, e.g. '.json'.
save_result_to_stream(self, result, stream)
🔗
Saves the result to a (binary) stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
multimeter.result.Result |
The result to save. |
required |
stream |
io.RawIOBase |
The binary stream into which the result data should be written. The stream is not automatically closed. |
required |
Source code in multimeter/storages/file.py
def save_result_to_stream(self, result, stream):
"""
Saves the result to a (binary) stream.
Args:
result (multimeter.result.Result): The result to save.
stream (io.RawIOBase): The binary stream into which the result data should
be written. The stream is not automatically closed.
"""
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
metrics_json = _SerializableGenerator(
{
'key': o.key,
'description': o.description,
'unit': o.unit,
'value_type': o.value_type.__name__,
'min_value': o.min_value,
'max_value': o.max_value,
}
for o in result.schema.metrics
)
subjects_json = _SerializableGenerator(
{
'key': o.key,
'description': o.description,
}
for o in result.schema.subjects
)
measures_json = _SerializableGenerator(
{
'key': o.key,
'subject': o.subject.key,
'metric': o.metric.key,
}
for o in result.schema.measures
)
points_json = _SerializableGenerator(
{
'datetime': o.datetime.isoformat(timespec='milliseconds'),
'values': o.values,
}
for o in result.points
)
marks_json = _SerializableGenerator(
{
'datetime': o.datetime.isoformat(timespec='milliseconds'),
'label': o.label,
}
for o in result.marks
)
result_json = {
'identifier': result.identifier,
'tags': result.tags,
'meta_data': result.meta_data,
'schema': {
'metrics': metrics_json,
'subjects': subjects_json,
'measures': measures_json,
},
'points': points_json,
'marks': marks_json,
}
json.dump(result_json, text_stream, indent=2)
text_stream.detach()
LineFormat (FileFormat)
🔗
Format implementation that stores results as Line format.
The format is used by time series databases like InfluxDB. For more information: https://docs.influxdata.com/influxdb/v2.1/reference/syntax/line-protocol/#string
extension
property
readonly
🔗
Returns the file extensions for the file, e.g. '.json'.
save_result_to_stream(self, result, stream)
🔗
Saves the result to a (binary) stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
multimeter.result.Result |
The result to save. |
required |
stream |
io.RawIOBase |
The binary stream into which the result data should be written. The stream is not automatically closed. |
required |
Source code in multimeter/storages/file.py
def save_result_to_stream(self, result, stream):
"""
Saves the result to a (binary) stream.
Args:
result (multimeter.result.Result): The result to save.
stream (io.RawIOBase): The binary stream into which the result data should
be written. The stream is not automatically closed.
"""
self._check_measurement_identifier(result.identifier)
text_stream = io.TextIOWrapper(stream, encoding='utf-8')
tags = ''.join(
f",{self._escape_tag(key)}={self._escape_tag(value)}"
for key, value in result.tags.items()
)
identifier = self._escape_identifier(result.identifier)
for point in result.points:
values = ','.join(
f'{self._escape_field_key(key)}={self._value_as_string(value)}'
for key, value in point.values.items()
)
# convert from seconds to nano
timestamp = int(point.datetime.timestamp() * 1000 * 1000 * 1000)
text_stream.write(f'{identifier}{tags} {values} {timestamp}\n')
text_stream.detach()
influx
🔗
Store measurements in an InfluxDB
InfluxDBStorage (Storage)
🔗
Storage implementation that stores results in InfluxDB.
The identifier of the result is used as the "measurement". The results from the
measured are mapped to InfluxDB fields and tags are supported as well. The
constructor of the storage allows to configure, under which org and in which
bucket the data will be stored. Which InfluxDB to use is configured by the url
argument, authorization is done using an authorization token, that has to be
generated from the InfluxDB server.
__init__(self, token, url='http://localhost:8086', org='kantai', bucket='multimeter')
special
🔗
Creates a new InfluxDB storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
The authentication token for InfluxDB. |
required |
url |
url |
The URL to the InfluxDB instance. Defaults to 'http://localhost:8086'. |
'http://localhost:8086' |
org |
str |
The id of the organization which should own the data. Defaults to 'kantai'. |
'kantai' |
bucket |
str |
The bucket where the data will be stored. Defaults to 'multimeter'. |
'multimeter' |
Source code in multimeter/storages/influx.py
def __init__(
self,
token,
url="http://localhost:8086",
org="kantai",
bucket="multimeter",
):
"""
Creates a new InfluxDB storage.
Args:
token (str): The authentication token for InfluxDB.
url (url): The URL to the InfluxDB instance. Defaults to
'http://localhost:8086'.
org (str): The id of the organization which should own the data. Defaults
to 'kantai'.
bucket (str): The bucket where the data will be stored. Defaults to
'multimeter'.
"""
if token is None:
raise ValueError("'token' must be set.")
self._bucket = bucket
self._client = influxdb_client.InfluxDBClient(url, token, org=org)
logger.info("Created influxdb client with url '%s', org '%s'.", url, org)
store(self, result)
🔗
Store the result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result |
multimeter.result.Result |
The result to be stored. |
required |
Source code in multimeter/storages/influx.py
def store(self, result):
def create_influxdb_point(identifier, point):
influx_point = influxdb_client.Point(identifier).time(point.datetime)
for field, value in point.values.items():
influx_point.field(field, value)
return influx_point
point_settings = influxdb_client.client.write_api.PointSettings()
for label, value in result.tags.items():
point_settings.add_default_tag(label, value)
write_client = self._client.write_api(
write_options=influxdb_client.client.write_api.SYNCHRONOUS,
point_settings=point_settings,
)
records = (
create_influxdb_point(result.identifier, point) for point in result.points
)
logger.info("Write new measurement %s to data base", result.identifier)
write_client.write(bucket=self._bucket, record=records)
subject
🔗
Types for representing subjects
Subject
🔗
Class representing a measurement subject.
Attributes:
Name | Type | Description |
---|---|---|
key |
str |
The key how the subject is referenced in measures. |
description |
str |
Description of the subject. |
visualization
🔗
Command line tool for setting up a docker based visualization
install(args)
🔗
Install the visualization container and configure them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The arguments parsed from command-line. |
required |
Source code in multimeter/visualization.py
def install(args): # pylint: disable=too-many-locals
"""
Install the visualization container and configure them.
Args:
args (argparse.Namespace): The arguments parsed from command-line.
"""
config_file_path = pathlib.Path(args.config_file)
if config_file_path.exists():
logger.error("Visualization config already found at %s", args.config_file)
sys.exit(-2)
data_directory = pathlib.Path(args.data_dir)
multimeter_directory = pathlib.Path(__file__).parent
visu_resources_directory = multimeter_directory / 'visualization'
grafana_resources_directory = visu_resources_directory / 'grafana'
grafana_provisioning_directory = grafana_resources_directory / 'provisioning'
grafana_dashboards_templates_directory = grafana_resources_directory / 'dashboards'
influx_resources_directory = visu_resources_directory / 'influxdb'
org = args.org
data_bucket = args.bucket
data_directory.mkdir(parents=True, exist_ok=True)
influx_dir = data_directory / 'influxdb'
influx_dir.mkdir(exist_ok=True)
influx_data_dir = influx_dir / 'data'
influx_data_dir.mkdir(exist_ok=True)
influx_config_dir = influx_dir / 'config'
influx_config_dir.mkdir(exist_ok=True)
grafana_dir = data_directory / 'grafana'
new_env = {
'UID': str(os.getuid()),
'DATA_DIR': str(data_directory.absolute()),
'INFLUX_ADMIN': 'admin',
'INFLUX_PASSWORD': 'admin123',
'INFLUX_ORG': org,
'INFLUX_BUCKET': data_bucket,
'GRAFANA_PROVISIONING_DIR': str(grafana_provisioning_directory.absolute()),
}
# Starting only influxdb
logger.info("Create and start influxdb...")
_execute_docker_command('up', '-d', 'influxdb2', env=new_env)
logger.info("Waiting for influxdb to come up...")
time.sleep(5)
# create authorization token
logger.info("Create influxdb authorization token for grafana")
output = _execute_docker_command(
'exec',
'influxdb2',
'influx',
'auth',
'create',
'-o',
org,
'-d',
'"Token for Grafana access"',
'--all-access',
'--json',
env=new_env,
)
json_result = json.loads(output)
token = json_result['token']
logger.debug("Created token %s", token)
new_env['GRAFANA_INFLUX_DATA_SOURCE_TOKEN'] = token
# load example data
logger.info("Load example data into influxdb")
_load_example_data(influx_data_dir, influx_resources_directory)
_execute_docker_command(
'exec',
'influxdb2',
'influx',
'write',
'-o',
org,
'-b',
data_bucket,
'-t',
token,
'-f',
'/var/lib/influxdb2/example.line',
env=new_env,
)
_copy_dashboards_from_templates(
data_bucket, grafana_dashboards_templates_directory, grafana_dir
)
logger.info("Create and start grafana...")
_execute_docker_command('up', '-d', 'grafana8', env=new_env)
_save_visu_config(config_file_path, new_env)
_print_instructions()
main(args=None)
🔗
Main function for the visualization tool.
Source code in multimeter/visualization.py
def main(args=None):
"""
Main function for the visualization tool.
"""
multimeter_dir = pathlib.Path.home() / '.multimeter'
multimeter_dir.mkdir(exist_ok=True)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(logging.Formatter(fmt='%(message)s'))
stdout_handler.level = logging.INFO
file_handler = logging.FileHandler(multimeter_dir / 'visu.log')
file_handler.level = logging.DEBUG
file_handler.setFormatter(
logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
logging.basicConfig(
level=logging.DEBUG,
handlers=[stdout_handler, file_handler],
)
logger.debug("Command line arguments: %s", args)
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.set_defaults(command=lambda _: parser.print_help())
parser.add_argument(
'-c',
'--config',
dest='config_file',
help="The file where the config for the visualization is stored.",
default=multimeter_dir / 'visu.config.json',
)
subparsers = parser.add_subparsers(help='sub-command help')
install_parser = subparsers.add_parser(
'install', help="Create the necessary docker container and configure them"
)
install_parser.add_argument(
'-d',
'--dir',
dest='data_dir',
help="The directory where the data will be stored.",
default=multimeter_dir / 'sdocker-data',
)
install_parser.add_argument(
'-o', '--org', dest='org', help="The organization in influx.", default='kantai'
)
install_parser.add_argument(
'-b',
'--bucket',
dest='bucket',
help="The bucket in influx for storing measurements.",
default='multimeter',
)
install_parser.set_defaults(command=install)
remove_parser = subparsers.add_parser(
'remove', help="Remove docker containers and remove the stored data"
)
remove_parser.set_defaults(command=remove)
start_parser = subparsers.add_parser('start', help="Start the visualization")
start_parser.set_defaults(command=start)
stop_parser = subparsers.add_parser('stop', help="Stop the visualization")
stop_parser.set_defaults(command=stop)
_check_docker_compose_available()
args = parser.parse_args(args)
args.command(args)
remove(args)
🔗
Remove the visualization container and delete the data stored for the visu.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The arguments parsed from command-line. |
required |
Source code in multimeter/visualization.py
def remove(args):
"""
Remove the visualization container and delete the data stored for the visu.
Args:
args (argparse.Namespace): The arguments parsed from command-line.
"""
config_file_path = pathlib.Path(args.config_file)
if not config_file_path.exists():
logger.error("No config file found at %s", args.config_file)
sys.exit(-3)
with open(config_file_path, 'r', encoding='utf-8') as stream:
environment = json.load(stream)['environment']
logger.info("Remove docker containers...")
_execute_docker_command('down', env=environment)
logger.info("Remove data directory %s", str(environment['DATA_DIR']))
shutil.rmtree(environment['DATA_DIR'])
logger.info("Remove visualization config in %s", str(config_file_path))
config_file_path.unlink()
start(args)
🔗
Start the visualization container.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The arguments parsed from command-line. |
required |
Source code in multimeter/visualization.py
def start(args):
"""
Start the visualization container.
Args:
args (argparse.Namespace): The arguments parsed from command-line.
"""
config_file_path = pathlib.Path(args.config_file)
if not config_file_path.exists():
logger.error("No config file found at %s", args.config_file)
sys.exit(-3)
with open(config_file_path, 'r', encoding='utf-8') as stream:
environment = json.load(stream)['environment']
logger.info("Starting docker containers...")
_execute_docker_command('start', env=environment)
stop(args)
🔗
Stop the visualization container.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace |
The arguments parsed from command-line. |
required |
Source code in multimeter/visualization.py
def stop(args):
"""
Stop the visualization container.
Args:
args (argparse.Namespace): The arguments parsed from command-line.
"""
config_file_path = pathlib.Path(args.config_file)
if not config_file_path.exists():
logger.error("No config file found at %s", args.config_file)
sys.exit(-3)
with open(config_file_path, 'r', encoding='utf-8') as stream:
environment = json.load(stream)['environment']
logger.info("Stopping docker containers...")
_execute_docker_command('stop', env=environment)