Skip to content

API

Python client library for the Keyraser key shredding system

This library contains a client implementation wrapping around the keyraser command line binaries that allow to encrypt/decrypt data using the Keyraser key shredding system.

In order to use the library, it requires to know the location of the binaries. As a default the library looks for them within the PATH by using shutil.which(). If this doesn't work, one has to define the location of the keyraser binaries that are called by the wrapper class. This can happen using in two ways:

Define the KR_BINARIES environment variable

If this environment variable is specified, the library expects all binaries to be located in the directory, that its value points to. If a binary is missing, a RuntimeError is raised.

Define the binary paths by using set_binary_path(path, binary=None)

This function takes in either the path of a directory containing all binaries or a path to a file together with the name of a binary that this path is used for. If the path is not valid or a binary can't be found at the location, a RuntimeError is raised.

Keyraser wrapper client

Client 🔗

Class that provides the functionality of encrypting and decrypting data.

The Client can be configured with two different values:

keystore_address contains the hostname and port of the keystore from where the client will fetch the encryption keys. Defaults to None in which case the default address used in the binary is used. credential_path contains the path to a file which is used for authenticating the client. Defaults to None. In this case no explicit credential file is used, which requires the credential to be provided using the KR_CREDENTIAL environment variable.

Examples:

    import keyraser_client

    client = keyraser_client.Client(
        keystore_address='localhost:1996',
        credential_path='/my/path/to/the/client.kred',
    )
Source code in keyraser_client/client.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class Client:
    """
    Class that provides the functionality of encrypting and decrypting data.

    The `Client` can be configured with two different values:

    `keystore_address` contains the hostname and port of the keystore from where the
        client will fetch the encryption keys. Defaults to `None` in which case the
        default address used in the binary is used.
    `credential_path` contains the path to a file which is used for authenticating the
        client. Defaults to `None`. In this case no explicit credential file is used,
        which requires the credential to be provided using the `KR_CREDENTIAL`
        environment variable.

    Examples:
        ```python
            import keyraser_client

            client = keyraser_client.Client(
                keystore_address='localhost:1996',
                credential_path='/my/path/to/the/client.kred',
            )
        ```

    """

    def __init__(
        self,
        *,
        keystore_address: Optional[str] = None,
        credential_path: Optional[str] = None,
    ):
        """
        Creates a new `Client`.

        Args:
            keystore_address: The string with the address of the keystore, defaults to
                `None`.
            credential_path: The file path to the credential to authenticate the
                client, defaults to `None`.
        """
        self.keystore_address = keystore_address
        self.credential_path = credential_path

    def encrypt(
        self,
        format: Format,
        src: Union[str, pathlib.Path, io.BufferedIOBase, bytes],
        dest: Union[str, pathlib.Path, io.BufferedIOBase],
    ):
        """
        Encrypts a stream of data.

        Args:
            format: The format of the data that will be encrypted.
            src: Either a path to a file or a stream from which the data is read to be
                encrypted.
            dest: Either a path to a file or a stream to which the encrypted data will
                be written.
        """
        args = [binary_paths['kr-encrypt']]

        args.extend(format.as_cli_params())

        if self.keystore_address is not None:
            args.extend(["-address", self.keystore_address])

        if self.credential_path is not None:
            args.extend(["-credential", self.credential_path])

        src_input = create_input(src)
        args.extend(src_input.cli_params())

        dest_output = create_output(dest)
        args.extend(dest_output.cli_params())

        open_kwargs: Dict = {}
        open_kwargs.update(**src_input.popen_kwargs())
        open_kwargs.update(**dest_output.popen_kwargs())
        with subprocess.Popen(args, **open_kwargs) as process:
            while not src_input.is_done() or not dest_output.is_done():
                src_input.process(process)
                dest_output.process(process)

            process.wait()

    def decrypt(
        self,
        src: Union[str, pathlib.Path, io.BufferedReader, bytes],
        dest: Union[str, pathlib.Path, io.BufferedWriter],
    ):
        """
        Decrypts an encrypted stream of data.

        Args:
            src: Either a path to a file or a stream from which the encrypted data
                will be read.
            dest: Either a path to a file or a stream to which the decrypted data will
                be written.
        """
        args = [binary_paths['kr-decrypt']]

        if self.keystore_address is not None:
            args.extend(["-address", self.keystore_address])

        if self.credential_path is not None:
            args.extend(["-credential", self.credential_path])

        src_input = create_input(src)
        args.extend(src_input.cli_params())

        dest_output = create_output(dest)
        args.extend(dest_output.cli_params())

        open_kwargs: Dict = {}
        open_kwargs.update(**src_input.popen_kwargs())
        open_kwargs.update(**dest_output.popen_kwargs())

        with subprocess.Popen(args, **open_kwargs) as process:
            if hasattr(process.stdout, 'fileno'):
                fd = process.stdout.fileno()
                fl = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

            while not src_input.is_done() or not dest_output.is_done():
                src_input.process(process)
                dest_output.process(process)

            process.wait()

    def encrypt_block(
        self, entity_id: str, src: bytes, *, id_format: IdFormat = IdFormat.STRING_UTF8
    ) -> bytes:
        """
        Encrypts a single block of data.

        Args:
            entity_id: A string containing the entity id for which the data block will
                be encrypted.
            src: A single buffer containing the bytes to encrypt.
            id_format: The format of the entity_id. Defaults to using the utf8
                encoded string as entity id.

        Returns:
            bytes: A buffer containing the encrypted data block.
        """
        args = [binary_paths['kr-encrypt'], 'block']

        if self.keystore_address is not None:
            args.extend(["-address", self.keystore_address])

        if self.credential_path is not None:
            args.extend(["-credential", self.credential_path])

        src_input = create_input(src)
        args.extend(src_input.cli_params())

        buffer = io.BytesIO()
        dest_output = create_output(buffer)
        args.extend(dest_output.cli_params())

        open_kwargs: Dict = {}
        open_kwargs.update(**src_input.popen_kwargs())
        open_kwargs.update(**dest_output.popen_kwargs())

        args.extend(id_format.as_cli_params())
        args.extend(['-id', entity_id])

        with subprocess.Popen(args, **open_kwargs) as process:
            if hasattr(process.stdout, 'fileno'):
                fd = process.stdout.fileno()
                fl = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

            while not src_input.is_done() or not dest_output.is_done():
                src_input.process(process)
                dest_output.process(process)

            process.wait()
            if process.returncode != 0:
                raise subprocess.CalledProcessError(
                    process.returncode, process.args, stderr=process.stderr
                )

        return buffer.getvalue()

    def decrypt_block(self, src: bytes) -> bytes:
        """
        Decrypts a single block of data.

        Args:
            src: A single buffer containing the encrypted bytes to decrypt.

        Returns:
            bytes: A buffer containing the decrypted data.
        """
        args = [binary_paths['kr-decrypt'], 'block']

        if self.keystore_address is not None:
            args.extend(["-address", self.keystore_address])

        if self.credential_path is not None:
            args.extend(["-credential", self.credential_path])

        src_input = create_input(src)
        args.extend(src_input.cli_params())

        buffer = io.BytesIO()
        dest_output = create_output(buffer)
        args.extend(dest_output.cli_params())

        open_kwargs: Dict = {}
        open_kwargs.update(**src_input.popen_kwargs())
        open_kwargs.update(**dest_output.popen_kwargs())

        with subprocess.Popen(args, **open_kwargs) as process:
            if hasattr(process.stdout, 'fileno'):
                fd = process.stdout.fileno()
                fl = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

            while not src_input.is_done() or not dest_output.is_done():
                src_input.process(process)
                dest_output.process(process)

            process.wait()
            if process.returncode != 0:
                raise subprocess.CalledProcessError(
                    process.returncode, process.args, stderr=process.stderr
                )

        return buffer.getvalue()

    def create_keys(
        self, entity_ids: List[str], *, id_format: IdFormat = IdFormat.STRING_UTF8
    ):
        """
        Create keys for a list of entities.

        Args:
            entity_ids: A list of strings containing the entity ids for which the keys
                will be created.
            id_format: The format of the entity ids. Defaults to using the utf8
                encoded string as entity id.
        """
        args = [binary_paths['kr-keys'], 'create']

        if self.keystore_address is not None:
            args.extend(["-address", self.keystore_address])

        if self.credential_path is not None:
            args.extend(["-credential", self.credential_path])

        args.extend(id_format.as_cli_params())

        for entity_id in entity_ids:
            args.extend(['-id', entity_id])

        subprocess.run(args, check=True)

    def delete_keys(
        self, entity_ids: List[str], *, id_format: IdFormat = IdFormat.STRING_UTF8
    ):
        """
        Delete keys for a list of entities.

        Args:
            entity_ids: A list of strings containing the entity ids for which the keys
                will be deleted.
            id_format: The format of the entity ids. Defaults to using the utf8
                encoded string as entity id.
        """
        args = [binary_paths['kr-keys'], 'delete']

        if self.keystore_address is not None:
            args.extend(["-address", self.keystore_address])

        if self.credential_path is not None:
            args.extend(["-credential", self.credential_path])

        args.extend(id_format.as_cli_params())

        for entity_id in entity_ids:
            args.extend(['-id', entity_id])

        subprocess.run(args, check=True)

__init__(*, keystore_address=None, credential_path=None) 🔗

Creates a new Client.

Parameters:

Name Type Description Default
keystore_address Optional[str]

The string with the address of the keystore, defaults to None.

None
credential_path Optional[str]

The file path to the credential to authenticate the client, defaults to None.

None
Source code in keyraser_client/client.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(
    self,
    *,
    keystore_address: Optional[str] = None,
    credential_path: Optional[str] = None,
):
    """
    Creates a new `Client`.

    Args:
        keystore_address: The string with the address of the keystore, defaults to
            `None`.
        credential_path: The file path to the credential to authenticate the
            client, defaults to `None`.
    """
    self.keystore_address = keystore_address
    self.credential_path = credential_path

create_keys(entity_ids, *, id_format=IdFormat.STRING_UTF8) 🔗

Create keys for a list of entities.

Parameters:

Name Type Description Default
entity_ids List[str]

A list of strings containing the entity ids for which the keys will be created.

required
id_format IdFormat

The format of the entity ids. Defaults to using the utf8 encoded string as entity id.

STRING_UTF8
Source code in keyraser_client/client.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def create_keys(
    self, entity_ids: List[str], *, id_format: IdFormat = IdFormat.STRING_UTF8
):
    """
    Create keys for a list of entities.

    Args:
        entity_ids: A list of strings containing the entity ids for which the keys
            will be created.
        id_format: The format of the entity ids. Defaults to using the utf8
            encoded string as entity id.
    """
    args = [binary_paths['kr-keys'], 'create']

    if self.keystore_address is not None:
        args.extend(["-address", self.keystore_address])

    if self.credential_path is not None:
        args.extend(["-credential", self.credential_path])

    args.extend(id_format.as_cli_params())

    for entity_id in entity_ids:
        args.extend(['-id', entity_id])

    subprocess.run(args, check=True)

decrypt(src, dest) 🔗

Decrypts an encrypted stream of data.

Parameters:

Name Type Description Default
src Union[str, Path, BufferedReader, bytes]

Either a path to a file or a stream from which the encrypted data will be read.

required
dest Union[str, Path, BufferedWriter]

Either a path to a file or a stream to which the decrypted data will be written.

required
Source code in keyraser_client/client.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def decrypt(
    self,
    src: Union[str, pathlib.Path, io.BufferedReader, bytes],
    dest: Union[str, pathlib.Path, io.BufferedWriter],
):
    """
    Decrypts an encrypted stream of data.

    Args:
        src: Either a path to a file or a stream from which the encrypted data
            will be read.
        dest: Either a path to a file or a stream to which the decrypted data will
            be written.
    """
    args = [binary_paths['kr-decrypt']]

    if self.keystore_address is not None:
        args.extend(["-address", self.keystore_address])

    if self.credential_path is not None:
        args.extend(["-credential", self.credential_path])

    src_input = create_input(src)
    args.extend(src_input.cli_params())

    dest_output = create_output(dest)
    args.extend(dest_output.cli_params())

    open_kwargs: Dict = {}
    open_kwargs.update(**src_input.popen_kwargs())
    open_kwargs.update(**dest_output.popen_kwargs())

    with subprocess.Popen(args, **open_kwargs) as process:
        if hasattr(process.stdout, 'fileno'):
            fd = process.stdout.fileno()
            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

        while not src_input.is_done() or not dest_output.is_done():
            src_input.process(process)
            dest_output.process(process)

        process.wait()

decrypt_block(src) 🔗

Decrypts a single block of data.

Parameters:

Name Type Description Default
src bytes

A single buffer containing the encrypted bytes to decrypt.

required

Returns:

Name Type Description
bytes bytes

A buffer containing the decrypted data.

Source code in keyraser_client/client.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def decrypt_block(self, src: bytes) -> bytes:
    """
    Decrypts a single block of data.

    Args:
        src: A single buffer containing the encrypted bytes to decrypt.

    Returns:
        bytes: A buffer containing the decrypted data.
    """
    args = [binary_paths['kr-decrypt'], 'block']

    if self.keystore_address is not None:
        args.extend(["-address", self.keystore_address])

    if self.credential_path is not None:
        args.extend(["-credential", self.credential_path])

    src_input = create_input(src)
    args.extend(src_input.cli_params())

    buffer = io.BytesIO()
    dest_output = create_output(buffer)
    args.extend(dest_output.cli_params())

    open_kwargs: Dict = {}
    open_kwargs.update(**src_input.popen_kwargs())
    open_kwargs.update(**dest_output.popen_kwargs())

    with subprocess.Popen(args, **open_kwargs) as process:
        if hasattr(process.stdout, 'fileno'):
            fd = process.stdout.fileno()
            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

        while not src_input.is_done() or not dest_output.is_done():
            src_input.process(process)
            dest_output.process(process)

        process.wait()
        if process.returncode != 0:
            raise subprocess.CalledProcessError(
                process.returncode, process.args, stderr=process.stderr
            )

    return buffer.getvalue()

delete_keys(entity_ids, *, id_format=IdFormat.STRING_UTF8) 🔗

Delete keys for a list of entities.

Parameters:

Name Type Description Default
entity_ids List[str]

A list of strings containing the entity ids for which the keys will be deleted.

required
id_format IdFormat

The format of the entity ids. Defaults to using the utf8 encoded string as entity id.

STRING_UTF8
Source code in keyraser_client/client.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def delete_keys(
    self, entity_ids: List[str], *, id_format: IdFormat = IdFormat.STRING_UTF8
):
    """
    Delete keys for a list of entities.

    Args:
        entity_ids: A list of strings containing the entity ids for which the keys
            will be deleted.
        id_format: The format of the entity ids. Defaults to using the utf8
            encoded string as entity id.
    """
    args = [binary_paths['kr-keys'], 'delete']

    if self.keystore_address is not None:
        args.extend(["-address", self.keystore_address])

    if self.credential_path is not None:
        args.extend(["-credential", self.credential_path])

    args.extend(id_format.as_cli_params())

    for entity_id in entity_ids:
        args.extend(['-id', entity_id])

    subprocess.run(args, check=True)

encrypt(format, src, dest) 🔗

Encrypts a stream of data.

Parameters:

Name Type Description Default
format Format

The format of the data that will be encrypted.

required
src Union[str, Path, BufferedIOBase, bytes]

Either a path to a file or a stream from which the data is read to be encrypted.

required
dest Union[str, Path, BufferedIOBase]

Either a path to a file or a stream to which the encrypted data will be written.

required
Source code in keyraser_client/client.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def encrypt(
    self,
    format: Format,
    src: Union[str, pathlib.Path, io.BufferedIOBase, bytes],
    dest: Union[str, pathlib.Path, io.BufferedIOBase],
):
    """
    Encrypts a stream of data.

    Args:
        format: The format of the data that will be encrypted.
        src: Either a path to a file or a stream from which the data is read to be
            encrypted.
        dest: Either a path to a file or a stream to which the encrypted data will
            be written.
    """
    args = [binary_paths['kr-encrypt']]

    args.extend(format.as_cli_params())

    if self.keystore_address is not None:
        args.extend(["-address", self.keystore_address])

    if self.credential_path is not None:
        args.extend(["-credential", self.credential_path])

    src_input = create_input(src)
    args.extend(src_input.cli_params())

    dest_output = create_output(dest)
    args.extend(dest_output.cli_params())

    open_kwargs: Dict = {}
    open_kwargs.update(**src_input.popen_kwargs())
    open_kwargs.update(**dest_output.popen_kwargs())
    with subprocess.Popen(args, **open_kwargs) as process:
        while not src_input.is_done() or not dest_output.is_done():
            src_input.process(process)
            dest_output.process(process)

        process.wait()

encrypt_block(entity_id, src, *, id_format=IdFormat.STRING_UTF8) 🔗

Encrypts a single block of data.

Parameters:

Name Type Description Default
entity_id str

A string containing the entity id for which the data block will be encrypted.

required
src bytes

A single buffer containing the bytes to encrypt.

required
id_format IdFormat

The format of the entity_id. Defaults to using the utf8 encoded string as entity id.

STRING_UTF8

Returns:

Name Type Description
bytes bytes

A buffer containing the encrypted data block.

Source code in keyraser_client/client.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def encrypt_block(
    self, entity_id: str, src: bytes, *, id_format: IdFormat = IdFormat.STRING_UTF8
) -> bytes:
    """
    Encrypts a single block of data.

    Args:
        entity_id: A string containing the entity id for which the data block will
            be encrypted.
        src: A single buffer containing the bytes to encrypt.
        id_format: The format of the entity_id. Defaults to using the utf8
            encoded string as entity id.

    Returns:
        bytes: A buffer containing the encrypted data block.
    """
    args = [binary_paths['kr-encrypt'], 'block']

    if self.keystore_address is not None:
        args.extend(["-address", self.keystore_address])

    if self.credential_path is not None:
        args.extend(["-credential", self.credential_path])

    src_input = create_input(src)
    args.extend(src_input.cli_params())

    buffer = io.BytesIO()
    dest_output = create_output(buffer)
    args.extend(dest_output.cli_params())

    open_kwargs: Dict = {}
    open_kwargs.update(**src_input.popen_kwargs())
    open_kwargs.update(**dest_output.popen_kwargs())

    args.extend(id_format.as_cli_params())
    args.extend(['-id', entity_id])

    with subprocess.Popen(args, **open_kwargs) as process:
        if hasattr(process.stdout, 'fileno'):
            fd = process.stdout.fileno()
            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

        while not src_input.is_done() or not dest_output.is_done():
            src_input.process(process)
            dest_output.process(process)

        process.wait()
        if process.returncode != 0:
            raise subprocess.CalledProcessError(
                process.returncode, process.args, stderr=process.stderr
            )

    return buffer.getvalue()

set_binary_path(path, binary=None) 🔗

Define the path to use for a binary or all binaries.

Parameters:

Name Type Description Default
path str

The path to the directory or the individual file containing the binary.

required
binary Optional[str]

The name of the binary, or None if the path points to a directory.

None
Source code in keyraser_client/client.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def set_binary_path(path: str, binary: Optional[str] = None):
    """
    Define the path to use for a binary or all binaries.

    Args:
        path: The path to the directory or the individual file containing the binary.
        binary: The name of the binary, or `None` if the path points to a directory.
    """
    if binary is None:
        for binary in _required_binaries:
            binary_path = os.path.join(path, binary)
            _check_path(binary, binary_path)
    else:
        _check_path(binary, path)

Formats for entity ids and data streams

CsvFormat 🔗

Bases: Format

Format for CSV-files, containing comma-separated values.

In this format each separate data item is written to a single line that contains individual text values separated by a separator, most often ',' or ';'. This allows for easy parsing. In order to know, to which entity a single item belongs a column must be specified that contains the id.

The name or index of the id column and the format of the ids can be configured using the id_column and id_format attributes.

Source code in keyraser_client/format.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class CsvFormat(Format):
    """
    Format for CSV-files, containing comma-separated values.

    In this format each separate data item is written to a single line that contains
    individual text values separated by a separator, most often ',' or ';'. This
    allows for easy parsing. In order to know, to which entity a single item belongs a
    column must be specified that contains the id.

    The name or index of the id column and the format of the ids can be configured
    using the `id_column` and `id_format` attributes.
    """

    def __init__(
        self,
        *,
        id_column: typing.Optional[str] = None,
        id_format: IdFormat = IdFormat.DEFAULT,
        separator: typing.Optional[str] = None,
        enclosing: typing.Optional[str] = None,
        contains_header: bool = False,
    ):
        """
        Create a new CsvFormat.

        Args:
            id_column: The name of the property in the JSON object that contains the
                entity id.
            id_format: The `IdFormat` that defines how the entity id is encoded in
                the `id_property`.
        """
        self.id_column = id_column
        self.id_format = id_format
        self.separator = separator
        self.enclosing = enclosing
        self.contains_header = contains_header
        super().__init__('csv')

    def as_cli_params(self) -> typing.List[str]:
        """
        Returns a list of command line parameters.

        Returns:
            [str]: The cli parameters when this format is used.
        """
        params = super().as_cli_params()
        if self.id_column is not None:
            params.extend(['-id', self.id_column])
        params.extend(self.id_format.as_cli_params())

        if self.separator is not None:
            params.extend(['-separator', self.separator])

        if self.enclosing is not None:
            params.extend(['-enclosing', self.enclosing])

        if self.contains_header:
            params.extend(['-header'])

        return params

__init__(*, id_column=None, id_format=IdFormat.DEFAULT, separator=None, enclosing=None, contains_header=False) 🔗

Create a new CsvFormat.

Parameters:

Name Type Description Default
id_column Optional[str]

The name of the property in the JSON object that contains the entity id.

None
id_format IdFormat

The IdFormat that defines how the entity id is encoded in the id_property.

DEFAULT
Source code in keyraser_client/format.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(
    self,
    *,
    id_column: typing.Optional[str] = None,
    id_format: IdFormat = IdFormat.DEFAULT,
    separator: typing.Optional[str] = None,
    enclosing: typing.Optional[str] = None,
    contains_header: bool = False,
):
    """
    Create a new CsvFormat.

    Args:
        id_column: The name of the property in the JSON object that contains the
            entity id.
        id_format: The `IdFormat` that defines how the entity id is encoded in
            the `id_property`.
    """
    self.id_column = id_column
    self.id_format = id_format
    self.separator = separator
    self.enclosing = enclosing
    self.contains_header = contains_header
    super().__init__('csv')

as_cli_params() 🔗

Returns a list of command line parameters.

Returns:

Type Description
List[str]

[str]: The cli parameters when this format is used.

Source code in keyraser_client/format.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def as_cli_params(self) -> typing.List[str]:
    """
    Returns a list of command line parameters.

    Returns:
        [str]: The cli parameters when this format is used.
    """
    params = super().as_cli_params()
    if self.id_column is not None:
        params.extend(['-id', self.id_column])
    params.extend(self.id_format.as_cli_params())

    if self.separator is not None:
        params.extend(['-separator', self.separator])

    if self.enclosing is not None:
        params.extend(['-enclosing', self.enclosing])

    if self.contains_header:
        params.extend(['-header'])

    return params

Format 🔗

Base class for data formats that can be encrypted.

Source code in keyraser_client/format.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Format:
    """
    Base class for data formats that can be encrypted.
    """

    def __init__(self, type_id: str):
        self.type_id = type_id

    def as_cli_params(self) -> typing.List[str]:
        """
        Returns a list of command line parameters.

        Returns:
            [str]: The cli parameters when this format is used.
        """
        return [self.type_id]

as_cli_params() 🔗

Returns a list of command line parameters.

Returns:

Type Description
List[str]

[str]: The cli parameters when this format is used.

Source code in keyraser_client/format.py
42
43
44
45
46
47
48
49
def as_cli_params(self) -> typing.List[str]:
    """
    Returns a list of command line parameters.

    Returns:
        [str]: The cli parameters when this format is used.
    """
    return [self.type_id]

IdFormat 🔗

Bases: Enum

Enum type that represents the format of entity ids.

Source code in keyraser_client/format.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class IdFormat(enum.Enum):
    """
    Enum type that represents the format of entity ids.
    """

    HEX = "HEX"
    """Entity ids are represented as a string of hexadecimal characters."""

    STRING_UTF8 = "STRING-UTF8"
    """Entity ids are the utf8 encoded strings."""

    DEFAULT = "DEFAULT"
    """Use default format used in binary."""

    def as_cli_params(self) -> typing.List[str]:
        """
        Returns a list of command line parameters.

        Returns:
            [str]: The cli parameters when this format is used.
        """
        if self == IdFormat.DEFAULT:
            return []

        return ['-idFormat', self.value]

DEFAULT = 'DEFAULT' class-attribute instance-attribute 🔗

Use default format used in binary.

HEX = 'HEX' class-attribute instance-attribute 🔗

Entity ids are represented as a string of hexadecimal characters.

STRING_UTF8 = 'STRING-UTF8' class-attribute instance-attribute 🔗

Entity ids are the utf8 encoded strings.

as_cli_params() 🔗

Returns a list of command line parameters.

Returns:

Type Description
List[str]

[str]: The cli parameters when this format is used.

Source code in keyraser_client/format.py
21
22
23
24
25
26
27
28
29
30
31
def as_cli_params(self) -> typing.List[str]:
    """
    Returns a list of command line parameters.

    Returns:
        [str]: The cli parameters when this format is used.
    """
    if self == IdFormat.DEFAULT:
        return []

    return ['-idFormat', self.value]

NdjsonFormat 🔗

Bases: Format

Format for newline-delimited JSON.

In this format each separate data item is written to a single line that contains a JSON object. In order to know, to which entity a single item belongs each JSON objects must use the same property that contains the id.

The name of the id property and the format of the ids can be configured using the id_property and id_format attributes.

Source code in keyraser_client/format.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class NdjsonFormat(Format):
    """
    Format for newline-delimited JSON.

    In this format each separate data item is written to a single line that contains
    a JSON object. In order to know, to which entity a single item belongs each JSON
    objects must use the same property that contains the id.

    The name of the id property and the format of the ids can be configured using
    the `id_property` and `id_format` attributes.
    """

    def __init__(
        self,
        *,
        id_property: typing.Optional[str] = None,
        id_format: IdFormat = IdFormat.DEFAULT,
    ):
        """
        Create a new NdjsonFormat.

        Args:
            id_property: The name of the property in the JSON object that contains the
                entity id.
            id_format: The `IdFormat` that defines how the entity id is encoded in
                the `id_property`.
        """
        self.id_property = id_property
        self.id_format = id_format
        super().__init__('ndjson')

    def as_cli_params(self) -> typing.List[str]:
        """
        Returns a list of command line parameters.

        Returns:
            [str]: The cli parameters when this format is used.
        """
        params = super().as_cli_params()
        if self.id_property is not None:
            params.extend(['-id', self.id_property])
        params.extend(self.id_format.as_cli_params())
        return params

__init__(*, id_property=None, id_format=IdFormat.DEFAULT) 🔗

Create a new NdjsonFormat.

Parameters:

Name Type Description Default
id_property Optional[str]

The name of the property in the JSON object that contains the entity id.

None
id_format IdFormat

The IdFormat that defines how the entity id is encoded in the id_property.

DEFAULT
Source code in keyraser_client/format.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def __init__(
    self,
    *,
    id_property: typing.Optional[str] = None,
    id_format: IdFormat = IdFormat.DEFAULT,
):
    """
    Create a new NdjsonFormat.

    Args:
        id_property: The name of the property in the JSON object that contains the
            entity id.
        id_format: The `IdFormat` that defines how the entity id is encoded in
            the `id_property`.
    """
    self.id_property = id_property
    self.id_format = id_format
    super().__init__('ndjson')

as_cli_params() 🔗

Returns a list of command line parameters.

Returns:

Type Description
List[str]

[str]: The cli parameters when this format is used.

Source code in keyraser_client/format.py
145
146
147
148
149
150
151
152
153
154
155
156
def as_cli_params(self) -> typing.List[str]:
    """
    Returns a list of command line parameters.

    Returns:
        [str]: The cli parameters when this format is used.
    """
    params = super().as_cli_params()
    if self.id_property is not None:
        params.extend(['-id', self.id_property])
    params.extend(self.id_format.as_cli_params())
    return params

SingleFormat 🔗

Bases: Format

Format for encypting a whole file for a single entity.

In this format we encrypt the whole data with for a single entity that is defined as a command line parameter.

The entity id and the format of the id can be configured using the entity_id and id_format attributes.

Source code in keyraser_client/format.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class SingleFormat(Format):
    """
    Format for encypting a whole file for a single entity.

    In this format we encrypt the whole data with for a single entity that is defined
    as a command line parameter.

    The entity id and the format of the id can be configured using the `entity_id` and
    `id_format` attributes.
    """

    def __init__(
        self,
        entity_id: str,
        *,
        id_format: IdFormat = IdFormat.DEFAULT,
    ):
        """
        Create a new SingleFormat.

        Args:
            entity_id: The entity_id which is used for encrypting the data.
            id_format: The `IdFormat` that defines how the entity id is encoded in
                the `entity_id`.
        """
        self.entity_id = entity_id
        self.id_format = id_format
        super().__init__('single')

    def as_cli_params(self) -> typing.List[str]:
        """
        Returns a list of command line parameters.

        Returns:
            [str]: The cli parameters when this format is used.
        """
        params = super().as_cli_params()
        params.extend(['-id', self.entity_id])
        params.extend(self.id_format.as_cli_params())
        return params

__init__(entity_id, *, id_format=IdFormat.DEFAULT) 🔗

Create a new SingleFormat.

Parameters:

Name Type Description Default
entity_id str

The entity_id which is used for encrypting the data.

required
id_format IdFormat

The IdFormat that defines how the entity id is encoded in the entity_id.

DEFAULT
Source code in keyraser_client/format.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def __init__(
    self,
    entity_id: str,
    *,
    id_format: IdFormat = IdFormat.DEFAULT,
):
    """
    Create a new SingleFormat.

    Args:
        entity_id: The entity_id which is used for encrypting the data.
        id_format: The `IdFormat` that defines how the entity id is encoded in
            the `entity_id`.
    """
    self.entity_id = entity_id
    self.id_format = id_format
    super().__init__('single')

as_cli_params() 🔗

Returns a list of command line parameters.

Returns:

Type Description
List[str]

[str]: The cli parameters when this format is used.

Source code in keyraser_client/format.py
188
189
190
191
192
193
194
195
196
197
198
def as_cli_params(self) -> typing.List[str]:
    """
    Returns a list of command line parameters.

    Returns:
        [str]: The cli parameters when this format is used.
    """
    params = super().as_cli_params()
    params.extend(['-id', self.entity_id])
    params.extend(self.id_format.as_cli_params())
    return params