Skip to content

OTEAPI DLite Plugin Strategies

This page provides documentation for the oteapi_dlite.strategies submodule, where all the OTEAPI DLite Plugin strategies are located.

These strategies will be available when setting up a server in an environment with oteapi-dlite installed.

convert

Generic function strategy that converts zero or more input instances to zero or more new output instances.

DLiteConvertConfig

Bases: FunctionConfig

DLite convert strategy resource config.

Source code in oteapi_dlite/strategies/convert.py
125
126
127
128
129
130
131
class DLiteConvertConfig(FunctionConfig):
    """DLite convert strategy resource config."""

    configuration: Annotated[
        DLiteConvertStrategyConfig,
        Field(description="DLite convert strategy-specific configuration."),
    ]

configuration: Annotated[DLiteConvertStrategyConfig, Field(description='DLite convert strategy-specific configuration.')] instance-attribute

DLiteConvertInputConfig

Bases: AttrDict

Configuration for input instance to generic DLite converter.

At least one of label or datamodel should be given.

Source code in oteapi_dlite/strategies/convert.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class DLiteConvertInputConfig(AttrDict):
    """Configuration for input instance to generic DLite converter.

    At least one of `label` or `datamodel` should be given.
    """

    label: Annotated[
        Optional[str],
        Field(
            description="Label of the instance.",
        ),
    ] = None
    datamodel: Annotated[
        Optional[str],
        Field(
            description="URI of data model.",
        ),
    ] = None
    property_mappings: Annotated[
        bool,
        Field(
            description="Whether to infer instance from property mappings.",
        ),
    ] = False

datamodel: Annotated[Optional[str], Field(description='URI of data model.')] = None class-attribute instance-attribute

label: Annotated[Optional[str], Field(description='Label of the instance.')] = None class-attribute instance-attribute

property_mappings: Annotated[bool, Field(description='Whether to infer instance from property mappings.')] = False class-attribute instance-attribute

DLiteConvertOutputConfig

Bases: AttrDict

Configuration for output instance to generic DLite converter.

Source code in oteapi_dlite/strategies/convert.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class DLiteConvertOutputConfig(AttrDict):
    """Configuration for output instance to generic DLite converter."""

    label: Annotated[
        Optional[str],
        Field(
            description="Label to use when storing the instance.",
        ),
    ] = None
    datamodel: Annotated[
        Optional[str],
        Field(
            description="URI of data model.  Used for documentation.",
        ),
    ] = None

datamodel: Annotated[Optional[str], Field(description='URI of data model. Used for documentation.')] = None class-attribute instance-attribute

label: Annotated[Optional[str], Field(description='Label to use when storing the instance.')] = None class-attribute instance-attribute

DLiteConvertStrategy

Generic DLite convert strategy for converting zero or more input instances to zero or more output instances.

Registers strategies:

  • ("functionType", "application/vnd.dlite-convert")
Source code in oteapi_dlite/strategies/convert.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@dataclass
class DLiteConvertStrategy:
    """Generic DLite convert strategy for converting zero or more input
    instances to zero or more output instances.

    **Registers strategies**:

    - `("functionType", "application/vnd.dlite-convert")`

    """

    function_config: DLiteConvertConfig

    def initialize(self) -> DLiteResult:
        """Initialize."""
        return DLiteResult(
            collection_id=get_collection(
                self.function_config.configuration.collection_id
            ).uuid
        )

    def get(self) -> DLiteResult:
        """Execute the strategy.

        This method will be called through the strategy-specific endpoint
        of the OTE-API Services.

        Returns:
            SessionUpdate instance.
        """
        config = self.function_config.configuration
        module = importlib.import_module(config.module_name, config.package)
        function = getattr(module, config.function_name)
        kwargs = config.kwargs

        coll = get_collection(config.collection_id)

        instances = []
        for i, input_config in enumerate(config.inputs):
            if input_config.label:
                instances.append(
                    coll.get(input_config.label, input_config.datamodel)
                )
            elif input_config.datamodel:
                inst = coll.get_instances(
                    metaid=input_config.datamodel,
                    property_mappings=input_config.property_mappings,
                    # More to do: add more arguments...
                )
                instances.append(inst)
            else:
                raise ValueError(
                    "either `label` or `datamodel` must be specified in "
                    f"inputs[{i}]"
                )
        outputs = function(*instances, **kwargs)
        if isinstance(outputs, dlite.Instance):
            outputs = [outputs]

        for inst, output_config in zip(outputs, config.outputs):
            coll.add(output_config.label, inst)

        update_collection(coll)
        return DLiteResult(collection_id=coll.uuid)

function_config: DLiteConvertConfig instance-attribute

get()

Execute the strategy.

This method will be called through the strategy-specific endpoint of the OTE-API Services.

Returns:

Type Description
DLiteResult

SessionUpdate instance.

Source code in oteapi_dlite/strategies/convert.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def get(self) -> DLiteResult:
    """Execute the strategy.

    This method will be called through the strategy-specific endpoint
    of the OTE-API Services.

    Returns:
        SessionUpdate instance.
    """
    config = self.function_config.configuration
    module = importlib.import_module(config.module_name, config.package)
    function = getattr(module, config.function_name)
    kwargs = config.kwargs

    coll = get_collection(config.collection_id)

    instances = []
    for i, input_config in enumerate(config.inputs):
        if input_config.label:
            instances.append(
                coll.get(input_config.label, input_config.datamodel)
            )
        elif input_config.datamodel:
            inst = coll.get_instances(
                metaid=input_config.datamodel,
                property_mappings=input_config.property_mappings,
                # More to do: add more arguments...
            )
            instances.append(inst)
        else:
            raise ValueError(
                "either `label` or `datamodel` must be specified in "
                f"inputs[{i}]"
            )
    outputs = function(*instances, **kwargs)
    if isinstance(outputs, dlite.Instance):
        outputs = [outputs]

    for inst, output_config in zip(outputs, config.outputs):
        coll.add(output_config.label, inst)

    update_collection(coll)
    return DLiteResult(collection_id=coll.uuid)

initialize()

Initialize.

Source code in oteapi_dlite/strategies/convert.py
147
148
149
150
151
152
153
def initialize(self) -> DLiteResult:
    """Initialize."""
    return DLiteResult(
        collection_id=get_collection(
            self.function_config.configuration.collection_id
        ).uuid
    )

DLiteConvertStrategyConfig

Bases: DLiteResult

Configuration for generic DLite converter.

Source code in oteapi_dlite/strategies/convert.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class DLiteConvertStrategyConfig(DLiteResult):
    """Configuration for generic DLite converter."""

    function_name: Annotated[
        str,
        Field(
            description="Name of convert function.  It will be pased the input "
            "instances as arguments and should return a sequence of output "
            "instances.",
        ),
    ]
    module_name: Annotated[
        str,
        Field(
            description=(
                "Name of Python module containing the convertion function."
            ),
        ),
    ]
    package: Annotated[
        Optional[str],
        Field(
            description=(
                "Used when performing a relative import of the converter "
                "function.  It specifies the package to use as the anchor "
                "point from which to resolve the relative import to an absolute"
                " import."
            ),
        ),
    ] = None
    pypi_package: Annotated[
        Optional[str],
        Field(
            description=(
                "Package name on PyPI.  This field is currently only "
                "informative, but might be used in the future for automatic "
                "package installation."
            ),
        ),
    ] = None
    inputs: Annotated[
        Sequence[DLiteConvertInputConfig],
        Field(
            description="Input instances.",
        ),
    ] = []
    outputs: Annotated[
        Sequence[DLiteConvertOutputConfig],
        Field(
            description="Output instances.",
        ),
    ] = []
    kwargs: Annotated[
        Optional[dict],
        Field(
            description="Additional keyword arguments passed "
            "to the convert function.",
        ),
    ] = {}  # noqa: RUF012

function_name: Annotated[str, Field(description='Name of convert function. It will be pased the input instances as arguments and should return a sequence of output instances.')] instance-attribute

inputs: Annotated[Sequence[DLiteConvertInputConfig], Field(description='Input instances.')] = [] class-attribute instance-attribute

kwargs: Annotated[Optional[dict], Field(description='Additional keyword arguments passed to the convert function.')] = {} class-attribute instance-attribute

module_name: Annotated[str, Field(description='Name of Python module containing the convertion function.')] instance-attribute

outputs: Annotated[Sequence[DLiteConvertOutputConfig], Field(description='Output instances.')] = [] class-attribute instance-attribute

package: Annotated[Optional[str], Field(description='Used when performing a relative import of the converter function. It specifies the package to use as the anchor point from which to resolve the relative import to an absolute import.')] = None class-attribute instance-attribute

pypi_package: Annotated[Optional[str], Field(description='Package name on PyPI. This field is currently only informative, but might be used in the future for automatic package installation.')] = None class-attribute instance-attribute

filter

Filter that removes all but specified instances in the collection.

DLiteFilterConfig

Bases: FilterConfig

DLite generate strategy config.

Source code in oteapi_dlite/strategies/filter.py
79
80
81
82
83
84
85
class DLiteFilterConfig(FilterConfig):
    """DLite generate strategy config."""

    configuration: Annotated[
        DLiteQueryConfig,
        Field(description="DLite filter strategy-specific configuration."),
    ]

configuration: Annotated[DLiteQueryConfig, Field(description='DLite filter strategy-specific configuration.')] instance-attribute

DLiteFilterStrategy

Filter that removes all but specified instances in the collection.

The query configuration should be a regular expression matching labels to keep in the collection. All other labels will be removed.

Registers strategies:

  • ("filterType", "application/vnd.dlite-filter")
Source code in oteapi_dlite/strategies/filter.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@dataclass
class DLiteFilterStrategy:
    """Filter that removes all but specified instances in the collection.

    The `query` configuration should be a regular expression matching labels
    to keep in the collection.  All other labels will be removed.

    **Registers strategies**:

    - `("filterType", "application/vnd.dlite-filter")`

    """

    filter_config: DLiteFilterConfig

    def initialize(self) -> DLiteResult:
        """Initialize."""
        return DLiteResult(
            collection_id=get_collection(
                self.filter_config.configuration.collection_id
            ).uuid
        )

    def get(self) -> DLiteResult:
        """Execute the strategy."""
        config = self.filter_config.configuration

        # Alias for query configuration
        keep_label = (
            config.keep_label if config.keep_label else self.filter_config.query
        )

        instdict = {}  # Map instance labels to [uuid, metaURI]
        coll = get_collection(config.collection_id)
        for s, _, o in coll.get_relations(p="_has-uuid"):
            instdict[s] = [o]
        for s, _, o in coll.get_relations(p="_has-meta"):
            instdict[s].append(o)

        removal = set()  # Labels marked for removal

        # 1: remove_label, remove_datamodel
        if config.remove_label or config.remove_datamodel:
            for label, (_, metauri) in instdict.items():
                if config.remove_label and re.match(config.remove_label, label):
                    removal.add(label)

                if config.remove_datamodel and re.match(
                    config.remove_datamodel, metauri
                ):
                    removal.add(label)
        else:
            removal.update(instdict.keys())

        # 2: keep_label, keep_datamodel
        for label in set(removal):
            if keep_label and re.match(keep_label, label):
                removal.remove(label)

            _, metauri = instdict[label]
            if config.keep_datamodel and re.match(
                config.keep_datamodel, metauri
            ):
                removal.remove(label)

        # 3: keep_referred
        if config.keep_referred:
            labels = {uuid: label for label, (uuid, _) in instdict.items()}
            kept = set(instdict.keys()).difference(removal)
            for label in kept:
                removal.difference_update(
                    labels[inst.uuid]
                    for inst in get_referred_instances(coll.get(label))
                    if inst.uuid in labels
                )

        # 4: remove from collection
        for label in removal:
            coll.remove(label)

        update_collection(coll)
        return DLiteResult(collection_id=coll.uuid)

filter_config: DLiteFilterConfig instance-attribute

get()

Execute the strategy.

Source code in oteapi_dlite/strategies/filter.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get(self) -> DLiteResult:
    """Execute the strategy."""
    config = self.filter_config.configuration

    # Alias for query configuration
    keep_label = (
        config.keep_label if config.keep_label else self.filter_config.query
    )

    instdict = {}  # Map instance labels to [uuid, metaURI]
    coll = get_collection(config.collection_id)
    for s, _, o in coll.get_relations(p="_has-uuid"):
        instdict[s] = [o]
    for s, _, o in coll.get_relations(p="_has-meta"):
        instdict[s].append(o)

    removal = set()  # Labels marked for removal

    # 1: remove_label, remove_datamodel
    if config.remove_label or config.remove_datamodel:
        for label, (_, metauri) in instdict.items():
            if config.remove_label and re.match(config.remove_label, label):
                removal.add(label)

            if config.remove_datamodel and re.match(
                config.remove_datamodel, metauri
            ):
                removal.add(label)
    else:
        removal.update(instdict.keys())

    # 2: keep_label, keep_datamodel
    for label in set(removal):
        if keep_label and re.match(keep_label, label):
            removal.remove(label)

        _, metauri = instdict[label]
        if config.keep_datamodel and re.match(
            config.keep_datamodel, metauri
        ):
            removal.remove(label)

    # 3: keep_referred
    if config.keep_referred:
        labels = {uuid: label for label, (uuid, _) in instdict.items()}
        kept = set(instdict.keys()).difference(removal)
        for label in kept:
            removal.difference_update(
                labels[inst.uuid]
                for inst in get_referred_instances(coll.get(label))
                if inst.uuid in labels
            )

    # 4: remove from collection
    for label in removal:
        coll.remove(label)

    update_collection(coll)
    return DLiteResult(collection_id=coll.uuid)

initialize()

Initialize.

Source code in oteapi_dlite/strategies/filter.py
103
104
105
106
107
108
109
def initialize(self) -> DLiteResult:
    """Initialize."""
    return DLiteResult(
        collection_id=get_collection(
            self.filter_config.configuration.collection_id
        ).uuid
    )

DLiteQueryConfig

Bases: DLiteResult

Configuration for the DLite filter strategy.

First the remove_label and remove_datamodel configurations are used to mark matching instances for removal. If neither remove_label or remove_datamodel are given, all instances are marked for removal.

Then instances matching keep_label and keep_datamodel are unmarked for removal.

If keep_referred is true, any instance that is referred to by an instance not marked for removal is also unmarked for removal.

Finally, the instances that are still marked for removal are removed from the collection.

Source code in oteapi_dlite/strategies/filter.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class DLiteQueryConfig(DLiteResult):
    """Configuration for the DLite filter strategy.

    First the `remove_label` and `remove_datamodel` configurations are
    used to mark matching instances for removal.  If neither
    `remove_label` or `remove_datamodel` are given, all instances are
    marked for removal.

    Then instances matching `keep_label` and `keep_datamodel` are unmarked
    for removal.

    If `keep_referred` is true, any instance that is referred to by
    an instance not marked for removal is also unmarked for removal.

    Finally, the instances that are still marked for removal are removed
    from the collection.
    """

    remove_label: Annotated[
        Optional[str],
        Field(description="Regular expression matching labels to remove."),
    ] = None
    remove_datamodel: Annotated[
        Optional[str],
        Field(
            description="Regular expression matching datamodel URIs to remove.",
        ),
    ] = None
    keep_label: Annotated[
        Optional[str],
        Field(
            description=(
                "Regular expression matching labels to keep. This "
                "configuration overrides `remove_label` and "
                "`remove_datamodel`. Alias for the FilterStrategy `query` "
                "configuration, that is inherited from the oteapi-core Filter "
                "data model."
            ),
        ),
    ] = None
    keep_datamodel: Annotated[
        Optional[str],
        Field(
            description=(
                "Regular expression matching datamodel URIs to keep in "
                "collection. This configuration overrides `remove_label` and "
                "`remove_datamodel`."
            ),
        ),
    ] = None
    keep_referred: Annotated[
        bool,
        Field(
            description=(
                "Whether to keep all instances in the collection that are "
                "directly or indirectly referred to (via ref-types or "
                "collections) by kept instances."
            ),
        ),
    ] = True

keep_datamodel: Annotated[Optional[str], Field(description='Regular expression matching datamodel URIs to keep in collection. This configuration overrides `remove_label` and `remove_datamodel`.')] = None class-attribute instance-attribute

keep_label: Annotated[Optional[str], Field(description='Regular expression matching labels to keep. This configuration overrides `remove_label` and `remove_datamodel`. Alias for the FilterStrategy `query` configuration, that is inherited from the oteapi-core Filter data model.')] = None class-attribute instance-attribute

keep_referred: Annotated[bool, Field(description='Whether to keep all instances in the collection that are directly or indirectly referred to (via ref-types or collections) by kept instances.')] = True class-attribute instance-attribute

remove_datamodel: Annotated[Optional[str], Field(description='Regular expression matching datamodel URIs to remove.')] = None class-attribute instance-attribute

remove_label: Annotated[Optional[str], Field(description='Regular expression matching labels to remove.')] = None class-attribute instance-attribute

generate

Generic generate strategy using DLite storage plugin.

hasInput = 'https://w3id.org/emmo#EMMO_36e69413_8c59_4799_946c_10b05d266e22' module-attribute

hasOutput = 'https://w3id.org/emmo#EMMO_c4bace1d_4db0_4cd3_87e9_18122bae2840' module-attribute

DLiteGenerateConfig

Bases: FunctionConfig

DLite generate strategy config.

Source code in oteapi_dlite/strategies/generate.py
236
237
238
239
240
241
242
class DLiteGenerateConfig(FunctionConfig):
    """DLite generate strategy config."""

    configuration: Annotated[
        DLiteStorageConfig,
        Field(description="DLite generate strategy-specific configuration."),
    ]

configuration: Annotated[DLiteStorageConfig, Field(description='DLite generate strategy-specific configuration.')] instance-attribute

DLiteGenerateStrategy

Generic DLite generate strategy utilising DLite storage plugins.

Registers strategies:

  • ("mediaType", "application/vnd.dlite-generate")
Source code in oteapi_dlite/strategies/generate.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
@dataclass
class DLiteGenerateStrategy:
    """Generic DLite generate strategy utilising DLite storage plugins.

    **Registers strategies**:

    - `("mediaType", "application/vnd.dlite-generate")`

    """

    function_config: DLiteGenerateConfig

    def initialize(self) -> DLiteResult:
        """Initialize."""
        return DLiteResult(
            collection_id=get_collection(
                self.function_config.configuration.collection_id
            ).uuid
        )

    def get(self) -> DLiteResult:
        """Execute the strategy.

        This method will be called through the strategy-specific endpoint
        of the OTE-API Services.

        Returns:
            SessionUpdate instance.
        """
        config = self.function_config.configuration
        cacheconfig = config.datacache_config

        driver = (
            config.driver
            if config.driver
            else get_driver(mediaType=config.mediaType)
        )

        coll = get_collection(config.collection_id)

        if config.label:
            inst = coll[config.label]
        elif config.datamodel:
            instances = coll.get_instances(
                metaid=config.datamodel,
                property_mappings=True,
                allow_incomplete=config.allow_incomplete,
            )
            inst = next(instances)
        elif config.store_collection:
            if config.store_collection_id:
                inst = coll.copy(newid=config.store_collection_id)
            else:
                inst = coll
        else:  # fail if there are more instances
            raise ValueError(
                "One of `label` or `datamodel` configurations should be given."
            )

        # Save instance
        if config.location:
            inst.save(driver, config.location, config.options)
        else:  # missing test
            if cacheconfig and cacheconfig.accessKey:
                key = cacheconfig.accessKey
            else:  # missing test
                key = "generate_data"
            cache = DataCache()
            with tempfile.TemporaryDirectory() as tmpdir:
                inst.save(driver, f"{tmpdir}/data", config.options)
                with Path(f"{tmpdir}/data").open("rb") as f:
                    cache.add(f.read(), key=key)

        # Store documentation of this instance in the knowledge base
        if config.kb_document_class:

            # Import here to avoid hard dependencies on tripper.
            from tripper import RDF
            from tripper.convert import save_container

            kb_settings = config.dlite_settings.get("tripper.triplestore")
            if isinstance(kb_settings, str):
                kb_settings = json.loads(kb_settings)
            if kb_settings and not isinstance(kb_settings, dict):
                raise ValueError(
                    "The `kb_document_class` configuration expects a dict "
                    "with settings for the tripper.triplestore."
                )

            if TYPE_CHECKING:  # pragma: no cover
                # This block will only be run by mypy when checking typing
                assert (
                    isinstance(kb_settings, dict) or kb_settings is None
                )  # nosec

            # IRI of new individual
            iri = individual_iri(
                class_iri=config.kb_document_class,
                base_iri=config.kb_document_base_iri,
            )

            triples = [(iri, RDF.type, config.kb_document_class)]
            if config.kb_document_context:
                for prop, val in config.kb_document_context.items():
                    triples.append((iri, prop, val))

            ts = get_triplestore(
                kb_settings=kb_settings,
                collection_id=config.collection_id,
            )
            try:
                if config.kb_document_computation:
                    comput = individual_iri(
                        class_iri=config.kb_document_computation,
                        base_iri=config.kb_document_base_iri,
                    )
                    triples.extend(
                        [
                            (comput, RDF.type, config.kb_document_computation),
                            (comput, hasOutput, iri),
                        ]
                    )

                    # Relate computation individual `comput` to its
                    # input individuals.
                    #
                    # This simple implementation works against KB.  It
                    # assumes that the input of
                    # `kb_document_computation` is documented in the
                    # KB and that there only exists one individual of each
                    # input class.
                    #
                    # In the case of multiple individuals of the input
                    # classes, the workflow executer must be involded
                    # in the documentation.  It can either do the
                    # documentation itself or provide a callback
                    # providing the needed info, which can be called
                    # from this strategy.

                    # Relate to input dataset individuals
                    restrictions = ts.restrictions(
                        config.kb_document_computation, hasInput
                    )
                    for r in restrictions:
                        input_class = r["value"]
                        indv = ts.value(predicate=RDF.type, object=input_class)
                        triples.append((comput, r["property"], indv))

                    # Add output dataset individuals
                    restrictions = ts.restrictions(
                        config.kb_document_computation, hasOutput
                    )
                    for r in restrictions:
                        output_class = r["value"]
                        indv = ts.value(
                            predicate=RDF.type,
                            object=output_class,
                            default=None,
                        )
                        if indv and indv != iri:
                            triples.append((comput, r["property"], indv))

                # Document data source
                resource = {
                    "dataresource": {
                        "type": config.kb_document_class,
                        "downloadUrl": config.location,
                        "mediaType": (
                            config.mediaType
                            if config.mediaType
                            else "application/vnd.dlite-parse"
                        ),
                        "configuration": {
                            "datamodel": (
                                config.datamodel
                                if config.datamodel
                                else inst.meta.uri
                            ),
                            "driver": config.driver,
                            "options": (  # Trying to be clever here...
                                config.options.replace("mode=w", "mode=r")
                                if config.options
                                else config.options
                            ),
                        },
                    },
                    # "parse": {},  # No supported by OTEAPI yet...
                    "mapping": {
                        "mappingType": "mappings",
                        # __TODO__
                        # Populate prefixes and triples from mapping
                        # strategy in current partial pipeline
                        # "prefixes": {},
                        # "triples": [],
                    },
                }
                update_dict(resource, config.kb_document_update)

                save_container(
                    ts,
                    resource,
                    iri,
                    recognised_keys="basic",
                )
                ts.add_triples(triples)

            finally:
                ts.close()

        # __TODO__
        # Can we safely assume that all strategies in a pipeline will be
        # executed in the same Python interpreter?  If not, we should write
        # the collection to a storage, such that it can be shared with the
        # other strategies.

        update_collection(coll)
        return DLiteResult(collection_id=coll.uuid)

function_config: DLiteGenerateConfig instance-attribute

get()

Execute the strategy.

This method will be called through the strategy-specific endpoint of the OTE-API Services.

Returns:

Type Description
DLiteResult

SessionUpdate instance.

Source code in oteapi_dlite/strategies/generate.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def get(self) -> DLiteResult:
    """Execute the strategy.

    This method will be called through the strategy-specific endpoint
    of the OTE-API Services.

    Returns:
        SessionUpdate instance.
    """
    config = self.function_config.configuration
    cacheconfig = config.datacache_config

    driver = (
        config.driver
        if config.driver
        else get_driver(mediaType=config.mediaType)
    )

    coll = get_collection(config.collection_id)

    if config.label:
        inst = coll[config.label]
    elif config.datamodel:
        instances = coll.get_instances(
            metaid=config.datamodel,
            property_mappings=True,
            allow_incomplete=config.allow_incomplete,
        )
        inst = next(instances)
    elif config.store_collection:
        if config.store_collection_id:
            inst = coll.copy(newid=config.store_collection_id)
        else:
            inst = coll
    else:  # fail if there are more instances
        raise ValueError(
            "One of `label` or `datamodel` configurations should be given."
        )

    # Save instance
    if config.location:
        inst.save(driver, config.location, config.options)
    else:  # missing test
        if cacheconfig and cacheconfig.accessKey:
            key = cacheconfig.accessKey
        else:  # missing test
            key = "generate_data"
        cache = DataCache()
        with tempfile.TemporaryDirectory() as tmpdir:
            inst.save(driver, f"{tmpdir}/data", config.options)
            with Path(f"{tmpdir}/data").open("rb") as f:
                cache.add(f.read(), key=key)

    # Store documentation of this instance in the knowledge base
    if config.kb_document_class:

        # Import here to avoid hard dependencies on tripper.
        from tripper import RDF
        from tripper.convert import save_container

        kb_settings = config.dlite_settings.get("tripper.triplestore")
        if isinstance(kb_settings, str):
            kb_settings = json.loads(kb_settings)
        if kb_settings and not isinstance(kb_settings, dict):
            raise ValueError(
                "The `kb_document_class` configuration expects a dict "
                "with settings for the tripper.triplestore."
            )

        if TYPE_CHECKING:  # pragma: no cover
            # This block will only be run by mypy when checking typing
            assert (
                isinstance(kb_settings, dict) or kb_settings is None
            )  # nosec

        # IRI of new individual
        iri = individual_iri(
            class_iri=config.kb_document_class,
            base_iri=config.kb_document_base_iri,
        )

        triples = [(iri, RDF.type, config.kb_document_class)]
        if config.kb_document_context:
            for prop, val in config.kb_document_context.items():
                triples.append((iri, prop, val))

        ts = get_triplestore(
            kb_settings=kb_settings,
            collection_id=config.collection_id,
        )
        try:
            if config.kb_document_computation:
                comput = individual_iri(
                    class_iri=config.kb_document_computation,
                    base_iri=config.kb_document_base_iri,
                )
                triples.extend(
                    [
                        (comput, RDF.type, config.kb_document_computation),
                        (comput, hasOutput, iri),
                    ]
                )

                # Relate computation individual `comput` to its
                # input individuals.
                #
                # This simple implementation works against KB.  It
                # assumes that the input of
                # `kb_document_computation` is documented in the
                # KB and that there only exists one individual of each
                # input class.
                #
                # In the case of multiple individuals of the input
                # classes, the workflow executer must be involded
                # in the documentation.  It can either do the
                # documentation itself or provide a callback
                # providing the needed info, which can be called
                # from this strategy.

                # Relate to input dataset individuals
                restrictions = ts.restrictions(
                    config.kb_document_computation, hasInput
                )
                for r in restrictions:
                    input_class = r["value"]
                    indv = ts.value(predicate=RDF.type, object=input_class)
                    triples.append((comput, r["property"], indv))

                # Add output dataset individuals
                restrictions = ts.restrictions(
                    config.kb_document_computation, hasOutput
                )
                for r in restrictions:
                    output_class = r["value"]
                    indv = ts.value(
                        predicate=RDF.type,
                        object=output_class,
                        default=None,
                    )
                    if indv and indv != iri:
                        triples.append((comput, r["property"], indv))

            # Document data source
            resource = {
                "dataresource": {
                    "type": config.kb_document_class,
                    "downloadUrl": config.location,
                    "mediaType": (
                        config.mediaType
                        if config.mediaType
                        else "application/vnd.dlite-parse"
                    ),
                    "configuration": {
                        "datamodel": (
                            config.datamodel
                            if config.datamodel
                            else inst.meta.uri
                        ),
                        "driver": config.driver,
                        "options": (  # Trying to be clever here...
                            config.options.replace("mode=w", "mode=r")
                            if config.options
                            else config.options
                        ),
                    },
                },
                # "parse": {},  # No supported by OTEAPI yet...
                "mapping": {
                    "mappingType": "mappings",
                    # __TODO__
                    # Populate prefixes and triples from mapping
                    # strategy in current partial pipeline
                    # "prefixes": {},
                    # "triples": [],
                },
            }
            update_dict(resource, config.kb_document_update)

            save_container(
                ts,
                resource,
                iri,
                recognised_keys="basic",
            )
            ts.add_triples(triples)

        finally:
            ts.close()

    # __TODO__
    # Can we safely assume that all strategies in a pipeline will be
    # executed in the same Python interpreter?  If not, we should write
    # the collection to a storage, such that it can be shared with the
    # other strategies.

    update_collection(coll)
    return DLiteResult(collection_id=coll.uuid)

initialize()

Initialize.

Source code in oteapi_dlite/strategies/generate.py
257
258
259
260
261
262
263
def initialize(self) -> DLiteResult:
    """Initialize."""
    return DLiteResult(
        collection_id=get_collection(
            self.function_config.configuration.collection_id
        ).uuid
    )

DLiteStorageConfig

Bases: DLiteConfiguration

Configuration for a generic DLite storage filter.

The DLite storage driver to can be specified using either the driver or mediaType field.

Where the output should be written, is specified using either the location or datacache_config.accessKey field.

Either label or datamodel should be provided.

Source code in oteapi_dlite/strategies/generate.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class DLiteStorageConfig(DLiteConfiguration):
    """Configuration for a generic DLite storage filter.

    The DLite storage driver to can be specified using either the `driver`
    or `mediaType` field.

    Where the output should be written, is specified using either the
    `location` or `datacache_config.accessKey` field.

    Either `label` or `datamodel` should be provided.
    """

    driver: Annotated[
        Optional[str],
        Field(
            description='Name of DLite driver (ex: "json").',
        ),
    ] = None
    mediaType: Annotated[
        Optional[str],
        Field(
            description='Media type for DLite driver (ex: "application/json").',
        ),
    ] = None
    options: Annotated[
        Optional[str],
        Field(
            description=(
                "Comma-separated list of options passed to the DLite "
                "storage plugin."
            ),
        ),
    ] = None
    location: Annotated[
        Optional[str],
        Field(
            description=(
                "Location of storage to write to.  If unset to store in data "
                "cache using the key provided with "
                "`datacache_config.accessKey` (defaults to 'generate_data')."
            ),
        ),
    ] = None
    label: Annotated[
        Optional[str],
        Field(
            description=(
                "Label of DLite instance in the collection to serialise."
            ),
        ),
    ] = None
    datamodel: Annotated[
        Optional[str],
        Field(
            description=(
                "URI to the datamodel of the new instance.  Needed when "
                "generating the instance from mappings.  Cannot be combined "
                "with `label`"
            ),
        ),
    ] = None
    store_collection: Annotated[
        bool,
        Field(
            description="Whether to store the entire collection in the session "
            "instead of a single instance.  Cannot be combined with `label` or "
            "`datamodel`.",
        ),
    ] = False
    store_collection_id: Annotated[
        Optional[str],
        Field(
            description="Used together with `store_collection` If given, store "
            "a copy of the collection with this id.",
        ),
    ] = None
    allow_incomplete: Annotated[
        Optional[bool],
        Field(
            description="Whether to allow incomplete property mappings.",
        ),
    ] = False
    datacache_config: Annotated[
        Optional[DataCacheConfig],
        Field(
            description="Configuration options for the local data cache.",
        ),
    ] = None
    kb_document_class: Annotated[
        Optional[str],
        Field(
            description=(
                "IRI of a class in the ontology."
                "\n\n"
                "If given, the generated DLite instance is documented in the "
                "knowledge base as an instance of this class."
                "\n\n"
                "Expects that the 'tripper.triplestore' setting has been "
                "set using the SettingsStrategy (vnd.dlite-settings). "
                "This settings should be a dict that can be passed "
                "as keyword arguments to `tripper.Triplestore()`."
                "\n\n"
                "Example of adding expected settings using OTELib:\n"
                "\n\n"
                ">>> kb_settings = client.create_filter(\n"
                "...     filterType='application/vnd.dlite-settings',\n"
                "...     configuration={\n"
                "...         'label': 'tripper.triplestore',\n"
                "...         'settings': {\n"
                "...             'backend': 'rdflib',\n"
                "...             'triplestore_url': '/path/to/local/kb.ttl',\n"
                "...         },\n"
                "...     },\n"
                "... )\n"
                ">>> generate = client.create_function(\n"
                "...     functionType='application/vnd.dlite-generate'\n"
                "...     configuration={\n"
                "...         kb_document_class='http://example.com#MyClass'\n"
                "...         ...\n"
                "...     },\n"
                "... )\n"
                ">>> pipeline = ... >> generate >> kb_settings\n"
                ">>> pipeline.get()\n"
            ),
        ),
    ] = None
    kb_document_update: Annotated[
        Optional[dict],
        Field(
            description=(
                "Dict updating the documentation (partial pipeline) created "
                "with `kb_document_class`."
                "\n\n"
                "This dict should be structured as follows: "
                "\n\n"
                "    {\n"
                '      "dataresource": {...},\n'
                '      "parse": {...}\n'
                '      "mapping": {...}\n'
                "    }\n"
                "\n"
                "where the provided items will override the the default "
                "configurations in respective partial pipeline created by "
                '`kb_document_class`.  Any of the items "dataresource", '
                '"parse" and "mapping" are optional.',
            ),
        ),
    ] = None
    kb_document_base_iri: Annotated[
        str, Field(description="Base IRI or prefix for created individuals.")
    ] = ":"
    kb_document_context: Annotated[
        Optional[dict],
        Field(
            description=(
                "If `kb_document_class` is given, this configuration will add "
                "additional context to the documentation of the generated "
                "individual."
                "\n\n"
                "This might be useful to make it easy to later access the "
                "generated individual."
                "\n\n"
                "This configuration should be a dict mapping providing the "
                "additional documentation of the driver. It should map OWL "
                "properties to either tripper literals or IRIs."
                "\n\n"
                "Example: `{RDF.type: ONTO.MyDataSet, "
                "EMMO.isDescriptionFor: ONTO.MyMaterial}`"
            ),
        ),
    ] = None
    kb_document_computation: Annotated[
        Optional[str],
        Field(
            description=(
                "IRI of a computation subclass."
                "\n\n"
                "Requires `kb_document_class`, and is used to "
                "document the computation (model) that the "
                "individual (of `kb_document_class`) to be documented "
                "is output of."
                "When `kb_document_computation` is given a new individual of "
                "the computation subclass is created. Input and "
                "output datasets are documented using the relation "
                " `emmo:hasInput` and `emmo:hasOutput`, "
                "respectively.  The individual of `kb_document_class` is "
                "one of the output individuals."
                "\n\n"
                "Note: This configuration relies on several assumptions:\n"
                "  - The `kb_document_computation` class exists in the "
                "knowledge base and is related to its input and output "
                "dataset classes via `emmo:hasInput` and `emmo:hasOutput` "
                "restrictions, respectively.\n"
                "  - There exists only one individual of each input dataset "
                "class.\n"
                "  - There exists at most one individual of each output "
                "dataset class.\n"
            ),
        ),
    ] = None

allow_incomplete: Annotated[Optional[bool], Field(description='Whether to allow incomplete property mappings.')] = False class-attribute instance-attribute

datacache_config: Annotated[Optional[DataCacheConfig], Field(description='Configuration options for the local data cache.')] = None class-attribute instance-attribute

datamodel: Annotated[Optional[str], Field(description='URI to the datamodel of the new instance. Needed when generating the instance from mappings. Cannot be combined with `label`')] = None class-attribute instance-attribute

driver: Annotated[Optional[str], Field(description='Name of DLite driver (ex: "json").')] = None class-attribute instance-attribute

kb_document_base_iri: Annotated[str, Field(description='Base IRI or prefix for created individuals.')] = ':' class-attribute instance-attribute

kb_document_class: Annotated[Optional[str], Field(description="IRI of a class in the ontology.\n\nIf given, the generated DLite instance is documented in the knowledge base as an instance of this class.\n\nExpects that the 'tripper.triplestore' setting has been set using the SettingsStrategy (vnd.dlite-settings). This settings should be a dict that can be passed as keyword arguments to `tripper.Triplestore()`.\n\nExample of adding expected settings using OTELib:\n\n\n>>> kb_settings = client.create_filter(\n... filterType='application/vnd.dlite-settings',\n... configuration={\n... 'label': 'tripper.triplestore',\n... 'settings': {\n... 'backend': 'rdflib',\n... 'triplestore_url': '/path/to/local/kb.ttl',\n... },\n... },\n... )\n>>> generate = client.create_function(\n... functionType='application/vnd.dlite-generate'\n... configuration={\n... kb_document_class='http://example.com#MyClass'\n... ...\n... },\n... )\n>>> pipeline = ... >> generate >> kb_settings\n>>> pipeline.get()\n")] = None class-attribute instance-attribute

kb_document_computation: Annotated[Optional[str], Field(description='IRI of a computation subclass.\n\nRequires `kb_document_class`, and is used to document the computation (model) that the individual (of `kb_document_class`) to be documented is output of.When `kb_document_computation` is given a new individual of the computation subclass is created. Input and output datasets are documented using the relation `emmo:hasInput` and `emmo:hasOutput`, respectively. The individual of `kb_document_class` is one of the output individuals.\n\nNote: This configuration relies on several assumptions:\n - The `kb_document_computation` class exists in the knowledge base and is related to its input and output dataset classes via `emmo:hasInput` and `emmo:hasOutput` restrictions, respectively.\n - There exists only one individual of each input dataset class.\n - There exists at most one individual of each output dataset class.\n')] = None class-attribute instance-attribute

kb_document_context: Annotated[Optional[dict], Field(description='If `kb_document_class` is given, this configuration will add additional context to the documentation of the generated individual.\n\nThis might be useful to make it easy to later access the generated individual.\n\nThis configuration should be a dict mapping providing the additional documentation of the driver. It should map OWL properties to either tripper literals or IRIs.\n\nExample: `{RDF.type: ONTO.MyDataSet, EMMO.isDescriptionFor: ONTO.MyMaterial}`')] = None class-attribute instance-attribute

kb_document_update: Annotated[Optional[dict], Field(description=('Dict updating the documentation (partial pipeline) created with `kb_document_class`.\n\nThis dict should be structured as follows: \n\n {\n "dataresource": {...},\n "parse": {...}\n "mapping": {...}\n }\n\nwhere the provided items will override the the default configurations in respective partial pipeline created by `kb_document_class`. Any of the items "dataresource", "parse" and "mapping" are optional.'))] = None class-attribute instance-attribute

label: Annotated[Optional[str], Field(description='Label of DLite instance in the collection to serialise.')] = None class-attribute instance-attribute

location: Annotated[Optional[str], Field(description="Location of storage to write to. If unset to store in data cache using the key provided with `datacache_config.accessKey` (defaults to 'generate_data').")] = None class-attribute instance-attribute

mediaType: Annotated[Optional[str], Field(description='Media type for DLite driver (ex: "application/json").')] = None class-attribute instance-attribute

options: Annotated[Optional[str], Field(description='Comma-separated list of options passed to the DLite storage plugin.')] = None class-attribute instance-attribute

store_collection: Annotated[bool, Field(description='Whether to store the entire collection in the session instead of a single instance. Cannot be combined with `label` or `datamodel`.')] = False class-attribute instance-attribute

store_collection_id: Annotated[Optional[str], Field(description='Used together with `store_collection` If given, store a copy of the collection with this id.')] = None class-attribute instance-attribute

KBError

Bases: ValueError

Invalid data in knowledge base.

Source code in oteapi_dlite/strategies/generate.py
30
31
class KBError(ValueError):
    """Invalid data in knowledge base."""

individual_iri(class_iri, base_iri=':', randbytes=6)

Return an IRI for an individual of a class.

Parameters:

Name Type Description Default
class_iri str

IRI of the class to create an individual of.

required
base_iri str

Base IRI of the created individual.

':'
randbytes int

Number of random bytes to include in the returned IRI.

6

Returns:

Type Description
str

IRI of a new individual.

Source code in oteapi_dlite/strategies/generate.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def individual_iri(
    class_iri: str, base_iri: str = ":", randbytes: int = 6
) -> str:
    """Return an IRI for an individual of a class.

    Arguments:
        class_iri: IRI of the class to create an individual of.
        base_iri: Base IRI of the created individual.
        randbytes: Number of random bytes to include in the returned IRI.

    Returns:
        IRI of a new individual.

    """
    basename = (
        class_iri.split(":", 1)[-1]
        .rsplit("/", 1)[-1]
        .rsplit("#", 1)[-1]
        .lower()
    )
    return f"{base_iri}{basename}-{os.urandom(randbytes).hex()}"

mapping

Mapping filter strategy.

DLiteMappingConfig

Bases: MappingConfig

DLite mapping strategy config.

Source code in oteapi_dlite/strategies/mapping.py
34
35
36
37
38
39
40
41
42
class DLiteMappingConfig(MappingConfig):
    """DLite mapping strategy config."""

    configuration: Annotated[
        DLiteMappingStrategyConfig,
        Field(
            description="DLite mapping strategy-specific configuration.",
        ),
    ] = DLiteMappingStrategyConfig()

configuration: Annotated[DLiteMappingStrategyConfig, Field(description='DLite mapping strategy-specific configuration.')] = DLiteMappingStrategyConfig() class-attribute instance-attribute

DLiteMappingStrategy

Strategy for a mapping.

Registers strategies:

  • ("mappingType", "mappings")
Source code in oteapi_dlite/strategies/mapping.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@dataclass
class DLiteMappingStrategy:
    """Strategy for a mapping.

    **Registers strategies**:

    - `("mappingType", "mappings")`

    """

    mapping_config: DLiteMappingConfig

    def initialize(self) -> DLiteResult:
        """Initialize strategy."""
        config = self.mapping_config.configuration

        coll = get_collection(config.collection_id)

        kb_settings = config.dlite_settings.get("tripper.triplestore")
        if isinstance(kb_settings, str):
            kb_settings = json.loads(kb_settings)
        if kb_settings and not isinstance(kb_settings, dict):
            raise ValueError(
                "The `tripper.triplestore` setting must be a dictionary."
            )

        if TYPE_CHECKING:  # pragma: no cover
            # This block will only be run by mypy when checking typing
            assert isinstance(kb_settings, dict) or kb_settings is None  # nosec

        ts = get_triplestore(kb_settings=kb_settings, collection_id=coll.uuid)

        if self.mapping_config.prefixes:
            for prefix, iri in self.mapping_config.prefixes.items():
                ts.bind(prefix, iri)

        if self.mapping_config.triples:
            ts.add_triples(
                [
                    [
                        ts.expand_iri(t) if isinstance(t, str) else t
                        for t in triple
                    ]
                    for triple in self.mapping_config.triples
                ]
            )

        update_collection(coll)
        return DLiteResult(collection_id=coll.uuid)

    def get(self) -> DLiteResult:
        """Execute strategy and return a dictionary."""
        return DLiteResult(
            collection_id=get_collection(
                self.mapping_config.configuration.collection_id
            ).uuid
        )

mapping_config: DLiteMappingConfig instance-attribute

get()

Execute strategy and return a dictionary.

Source code in oteapi_dlite/strategies/mapping.py
 95
 96
 97
 98
 99
100
101
def get(self) -> DLiteResult:
    """Execute strategy and return a dictionary."""
    return DLiteResult(
        collection_id=get_collection(
            self.mapping_config.configuration.collection_id
        ).uuid
    )

initialize()

Initialize strategy.

Source code in oteapi_dlite/strategies/mapping.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def initialize(self) -> DLiteResult:
    """Initialize strategy."""
    config = self.mapping_config.configuration

    coll = get_collection(config.collection_id)

    kb_settings = config.dlite_settings.get("tripper.triplestore")
    if isinstance(kb_settings, str):
        kb_settings = json.loads(kb_settings)
    if kb_settings and not isinstance(kb_settings, dict):
        raise ValueError(
            "The `tripper.triplestore` setting must be a dictionary."
        )

    if TYPE_CHECKING:  # pragma: no cover
        # This block will only be run by mypy when checking typing
        assert isinstance(kb_settings, dict) or kb_settings is None  # nosec

    ts = get_triplestore(kb_settings=kb_settings, collection_id=coll.uuid)

    if self.mapping_config.prefixes:
        for prefix, iri in self.mapping_config.prefixes.items():
            ts.bind(prefix, iri)

    if self.mapping_config.triples:
        ts.add_triples(
            [
                [
                    ts.expand_iri(t) if isinstance(t, str) else t
                    for t in triple
                ]
                for triple in self.mapping_config.triples
            ]
        )

    update_collection(coll)
    return DLiteResult(collection_id=coll.uuid)

DLiteMappingStrategyConfig

Bases: DLiteConfiguration

Configuration for a DLite mapping filter.

Source code in oteapi_dlite/strategies/mapping.py
23
24
25
26
27
28
29
30
31
class DLiteMappingStrategyConfig(DLiteConfiguration):
    """Configuration for a DLite mapping filter."""

    datamodel: Annotated[
        Optional[AnyUrl],
        Field(
            description="URI of the datamodel that is mapped.",
        ),
    ] = None

datamodel: Annotated[Optional[AnyUrl], Field(description='URI of the datamodel that is mapped.')] = None class-attribute instance-attribute

parse

Generic parse strategy using DLite storage plugin.

DLiteParseConfig

Bases: DLiteResult

Configuration for generic DLite parser.

Source code in oteapi_dlite/strategies/parse.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class DLiteParseConfig(DLiteResult):
    """Configuration for generic DLite parser."""

    # "Required" resource strategy fields
    downloadUrl: Annotated[
        Optional[HostlessAnyUrl],
        Field(
            description=ResourceConfig.model_fields["downloadUrl"].description
        ),
    ] = None

    mediaType: Annotated[
        Optional[str],
        Field(description=ResourceConfig.model_fields["mediaType"].description),
    ] = None

    # Parser-specific configuration
    driver: Annotated[
        str,
        Field(
            description='Name of DLite driver (ex: "json").',
        ),
    ]
    location: Annotated[
        Optional[str],
        Field(
            description=(
                "Explicit location of storage.  Normally data is read from the "
                "data cache using `datacache_config.accessKey` (default: "
                "'key')."
            ),
        ),
    ] = None
    options: Annotated[
        Optional[str],
        Field(
            description=(
                "Comma-separated list of options passed to the DLite storage "
                "plugin."
            ),
        ),
    ] = None
    id: Annotated[
        Optional[str],
        Field(
            description="If given, the id of the instance in the storage.",
        ),
    ] = None
    label: Annotated[
        Optional[str],
        Field(
            description=(
                "Optional label of the new DLite instance in the collection."
            ),
        ),
    ] = None
    datamodel: Annotated[
        Optional[str],
        Field(
            description=(
                "DLite datamodel documenting the structure of the data set. "
                "Often unused, since the datamodel is implicitly defined in "
                "the DLite driver (DLite plugin), but for a documentation "
                "point of view this is a very important field."
            ),
        ),
    ] = None
    download_config: Annotated[
        AttrDict,
        Field(description="Configurations provided to a download strategy."),
    ] = AttrDict()
    datacache_config: Annotated[
        Optional[DataCacheConfig],
        Field(
            description="Configuration options for the local data cache.",
        ),
    ] = None

datacache_config: Annotated[Optional[DataCacheConfig], Field(description='Configuration options for the local data cache.')] = None class-attribute instance-attribute

datamodel: Annotated[Optional[str], Field(description='DLite datamodel documenting the structure of the data set. Often unused, since the datamodel is implicitly defined in the DLite driver (DLite plugin), but for a documentation point of view this is a very important field.')] = None class-attribute instance-attribute

downloadUrl: Annotated[Optional[HostlessAnyUrl], Field(description=ResourceConfig.model_fields['downloadUrl'].description)] = None class-attribute instance-attribute

download_config: Annotated[AttrDict, Field(description='Configurations provided to a download strategy.')] = AttrDict() class-attribute instance-attribute

driver: Annotated[str, Field(description='Name of DLite driver (ex: "json").')] instance-attribute

id: Annotated[Optional[str], Field(description='If given, the id of the instance in the storage.')] = None class-attribute instance-attribute

label: Annotated[Optional[str], Field(description='Optional label of the new DLite instance in the collection.')] = None class-attribute instance-attribute

location: Annotated[Optional[str], Field(description="Explicit location of storage. Normally data is read from the data cache using `datacache_config.accessKey` (default: 'key').")] = None class-attribute instance-attribute

mediaType: Annotated[Optional[str], Field(description=ResourceConfig.model_fields['mediaType'].description)] = None class-attribute instance-attribute

options: Annotated[Optional[str], Field(description='Comma-separated list of options passed to the DLite storage plugin.')] = None class-attribute instance-attribute

DLiteParseParserConfig

Bases: ParserConfig

DLite parse strategy resource config.

Source code in oteapi_dlite/strategies/parse.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class DLiteParseParserConfig(ParserConfig):
    """DLite parse strategy resource config."""

    parserType: Annotated[
        Literal["application/vnd.dlite-parse"],
        Field(description=ParserConfig.model_fields["parserType"].description),
    ]
    configuration: Annotated[
        DLiteParseConfig,
        Field(description="DLite parse strategy-specific configuration."),
    ]
    entity: Annotated[
        Optional[AnyHttpUrl],
        Field(description=ParserConfig.model_fields["entity"].description),
    ] = None

configuration: Annotated[DLiteParseConfig, Field(description='DLite parse strategy-specific configuration.')] instance-attribute

entity: Annotated[Optional[AnyHttpUrl], Field(description=ParserConfig.model_fields['entity'].description)] = None class-attribute instance-attribute

parserType: Annotated[Literal['application/vnd.dlite-parse'], Field(description=ParserConfig.model_fields['parserType'].description)] instance-attribute

DLiteParseStrategy

Generic DLite parse strategy utilising DLite storage plugins.

Registers strategies:

  • ("mediaType", "application/vnd.dlite-parse")
Source code in oteapi_dlite/strategies/parse.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@dataclass
class DLiteParseStrategy:
    """Generic DLite parse strategy utilising DLite storage plugins.

    **Registers strategies**:

    - `("mediaType", "application/vnd.dlite-parse")`

    """

    parse_config: DLiteParseParserConfig

    def initialize(self) -> DLiteResult:
        """Initialize."""
        return DLiteResult(
            collection_id=get_collection(
                self.parse_config.configuration.collection_id
            ).uuid
        )

    def get(self) -> DLiteResult:
        """Execute the strategy.

        This method will be called through the strategy-specific endpoint
        of the OTE-API Services.

        Returns:
            Reference to a DLite collection ID.

        """
        config = self.parse_config.configuration
        cacheconfig = config.datacache_config

        driver = (
            config.driver
            if config.driver
            else get_driver(
                mediaType=self.parse_config.mediaType,
            )
        )

        # Create instance
        if config.location:
            inst = dlite.Instance.from_location(
                driver=driver,
                location=config.location,
                options=config.options,
                id=config.id,
            )
        else:
            # Download the file
            download_config = config.model_dump()
            download_config["configuration"] = (
                config.download_config.model_dump()
            )
            output = create_strategy("download", download_config).get()

            if cacheconfig and cacheconfig.accessKey:
                key = cacheconfig.accessKey
            elif "key" in output:
                key = output["key"]
            else:
                raise RuntimeError(
                    "No data cache key provided for the downloaded content."
                )

            # See if we can extract file suffix from downloadUrl
            if config.downloadUrl:
                suffix = Path(str(config.downloadUrl)).suffix
            else:
                suffix = None

            cache = DataCache(config.datacache_config)
            with cache.getfile(key, suffix=suffix) as location:
                inst = dlite.Instance.from_location(
                    driver=driver,
                    location=str(location),
                    options=config.options,
                    id=config.id,
                )

        # Insert inst into collection
        coll = get_collection(config.collection_id)
        label = config.label if config.label else inst.uuid
        coll.add(label, inst)

        # __TODO__
        # See
        # https://github.com/EMMC-ASBL/oteapi-dlite/pull/84#discussion_r1050437185
        # and following comments.
        #
        # Since we cannot safely assume that all strategies in a
        # pipeline will be executed in the same Python interpreter,
        # the collection should be written to a storage, such that it
        # can be shared with the other strategies.

        update_collection(coll)
        return DLiteResult(collection_id=coll.uuid)

parse_config: DLiteParseParserConfig instance-attribute

get()

Execute the strategy.

This method will be called through the strategy-specific endpoint of the OTE-API Services.

Returns:

Type Description
DLiteResult

Reference to a DLite collection ID.

Source code in oteapi_dlite/strategies/parse.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def get(self) -> DLiteResult:
    """Execute the strategy.

    This method will be called through the strategy-specific endpoint
    of the OTE-API Services.

    Returns:
        Reference to a DLite collection ID.

    """
    config = self.parse_config.configuration
    cacheconfig = config.datacache_config

    driver = (
        config.driver
        if config.driver
        else get_driver(
            mediaType=self.parse_config.mediaType,
        )
    )

    # Create instance
    if config.location:
        inst = dlite.Instance.from_location(
            driver=driver,
            location=config.location,
            options=config.options,
            id=config.id,
        )
    else:
        # Download the file
        download_config = config.model_dump()
        download_config["configuration"] = (
            config.download_config.model_dump()
        )
        output = create_strategy("download", download_config).get()

        if cacheconfig and cacheconfig.accessKey:
            key = cacheconfig.accessKey
        elif "key" in output:
            key = output["key"]
        else:
            raise RuntimeError(
                "No data cache key provided for the downloaded content."
            )

        # See if we can extract file suffix from downloadUrl
        if config.downloadUrl:
            suffix = Path(str(config.downloadUrl)).suffix
        else:
            suffix = None

        cache = DataCache(config.datacache_config)
        with cache.getfile(key, suffix=suffix) as location:
            inst = dlite.Instance.from_location(
                driver=driver,
                location=str(location),
                options=config.options,
                id=config.id,
            )

    # Insert inst into collection
    coll = get_collection(config.collection_id)
    label = config.label if config.label else inst.uuid
    coll.add(label, inst)

    # __TODO__
    # See
    # https://github.com/EMMC-ASBL/oteapi-dlite/pull/84#discussion_r1050437185
    # and following comments.
    #
    # Since we cannot safely assume that all strategies in a
    # pipeline will be executed in the same Python interpreter,
    # the collection should be written to a storage, such that it
    # can be shared with the other strategies.

    update_collection(coll)
    return DLiteResult(collection_id=coll.uuid)

initialize()

Initialize.

Source code in oteapi_dlite/strategies/parse.py
139
140
141
142
143
144
145
def initialize(self) -> DLiteResult:
    """Initialize."""
    return DLiteResult(
        collection_id=get_collection(
            self.parse_config.configuration.collection_id
        ).uuid
    )

parse_excel

Strategy for parsing an Excel spreadsheet to a DLite instance.

DLiteExcelParseConfig

Bases: DLiteResult

Configuration for DLite Excel parser.

Source code in oteapi_dlite/strategies/parse_excel.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class DLiteExcelParseConfig(DLiteResult):
    """Configuration for DLite Excel parser."""

    # Resource config
    downloadUrl: Annotated[
        Optional[HostlessAnyUrl],
        Field(
            description=ResourceConfig.model_fields["downloadUrl"].description
        ),
    ] = None

    mediaType: Annotated[
        Literal[
            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
        ],
        Field(description=ResourceConfig.model_fields["mediaType"].description),
    ] = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"

    # Parser config
    id: Annotated[
        Optional[str], Field(description="Optional id on new instance.")
    ] = None

    label: Annotated[
        Optional[str],
        Field(
            description="Optional label for new instance in collection.",
        ),
    ] = "excel-data"

    excel_config: Annotated[
        XLSXParseConfig,
        Field(
            description="DLite-specific excel configurations.",
        ),
    ]
    storage_path: Annotated[
        Optional[str],
        Field(
            description="Path to metadata storage",
        ),
    ] = None

downloadUrl: Annotated[Optional[HostlessAnyUrl], Field(description=ResourceConfig.model_fields['downloadUrl'].description)] = None class-attribute instance-attribute

excel_config: Annotated[XLSXParseConfig, Field(description='DLite-specific excel configurations.')] instance-attribute

id: Annotated[Optional[str], Field(description='Optional id on new instance.')] = None class-attribute instance-attribute

label: Annotated[Optional[str], Field(description='Optional label for new instance in collection.')] = 'excel-data' class-attribute instance-attribute

mediaType: Annotated[Literal['application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'], Field(description=ResourceConfig.model_fields['mediaType'].description)] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' class-attribute instance-attribute

storage_path: Annotated[Optional[str], Field(description='Path to metadata storage')] = None class-attribute instance-attribute

DLiteExcelParserConfig

Bases: ParserConfig

DLite excel parse strategy resource config.

Source code in oteapi_dlite/strategies/parse_excel.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class DLiteExcelParserConfig(ParserConfig):
    """DLite excel parse strategy resource config."""

    parserType: Annotated[
        Literal["application/vnd.dlite-xlsx"],
        Field(description=ParserConfig.model_fields["parserType"].description),
    ] = "application/vnd.dlite-xlsx"
    configuration: Annotated[
        DLiteExcelParseConfig,
        Field(description="DLite excel parse strategy-specific configuration."),
    ]
    entity: Annotated[
        Optional[AnyHttpUrl],
        Field(
            description=(
                "URI of DLite metadata to return. If not provided, the "
                "metadata will be inferred from the excel file."
            ),
        ),
    ] = None

configuration: Annotated[DLiteExcelParseConfig, Field(description='DLite excel parse strategy-specific configuration.')] instance-attribute

entity: Annotated[Optional[AnyHttpUrl], Field(description='URI of DLite metadata to return. If not provided, the metadata will be inferred from the excel file.')] = None class-attribute instance-attribute

parserType: Annotated[Literal['application/vnd.dlite-xlsx'], Field(description=ParserConfig.model_fields['parserType'].description)] = 'application/vnd.dlite-xlsx' class-attribute instance-attribute

DLiteExcelSessionUpdate

Bases: DLiteResult

Class for returning values from DLite excel parser.

Source code in oteapi_dlite/strategies/parse_excel.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class DLiteExcelSessionUpdate(DLiteResult):
    """Class for returning values from DLite excel parser."""

    inst_uuid: Annotated[
        str,
        Field(
            description="UUID of new instance.",
        ),
    ]
    label: Annotated[
        str,
        Field(
            description="Label of the new instance in the collection.",
        ),
    ]

inst_uuid: Annotated[str, Field(description='UUID of new instance.')] instance-attribute

label: Annotated[str, Field(description='Label of the new instance in the collection.')] instance-attribute

DLiteExcelStrategy

Parse strategy for Excel files.

Registers strategies:

  • ("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
Source code in oteapi_dlite/strategies/parse_excel.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@dataclass
class DLiteExcelStrategy:
    """Parse strategy for Excel files.

    **Registers strategies**:

    - `("mediaType",
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")`

    """

    parse_config: DLiteExcelParserConfig

    def initialize(self) -> DLiteResult:
        """Initialize."""
        return DLiteResult(
            collection_id=get_collection(
                self.parse_config.configuration.collection_id
            ).uuid
        )

    def get(self) -> DLiteExcelSessionUpdate:
        """Execute the strategy.

        This method will be called through the strategy-specific endpoint
        of the OTE-API Services.

        Returns:
            DLite instance.

        """
        config = self.parse_config.configuration

        if config.downloadUrl is None:
            raise ValueError("downloadUrl is required.")
        if config.mediaType is None:
            raise ValueError("mediaType is required.")

        xlsx_config = {
            "parserType": "parser/excel_xlsx",
            "configuration": config.excel_config.model_dump(),
            "entity": (
                self.parse_config.entity
                if self.parse_config.entity
                else "https://example.org"
            ),
        }
        xlsx_config["configuration"].update(
            {
                "downloadUrl": config.downloadUrl,
                "mediaType": config.mediaType,
            }
        )
        parser = create_strategy("parse", xlsx_config)
        columns: dict[str, Any] = parser.get()["data"]

        names, units = zip(*[split_column_name(column) for column in columns])
        rec = dict2recarray(columns, names=names)

        if not isinstance(units, (list, tuple)):
            # This check is to satisfy mypy for the `infer_metadata` call below.
            raise TypeError(
                f"units must be a list or tuple, instead it was {type(units)}"
            )

        meta_uri = self.parse_config.entity
        if meta_uri:
            if config.storage_path is not None:
                for storage_path in config.storage_path.split("|"):
                    dlite.storage_path.append(storage_path)
            meta = dlite.get_instance(str(meta_uri))
            # check the metadata config would go here
        else:
            meta = infer_metadata(rec, units=units)

        inst = meta(dimensions=[len(rec)], id=config.id)
        for name in names:
            inst[name] = rec[name]

        # Insert inst into collection
        coll = get_collection(config.collection_id)
        coll.add(config.label, inst)

        update_collection(coll)
        return DLiteExcelSessionUpdate(
            collection_id=coll.uuid,
            inst_uuid=inst.uuid,
            label=config.label,
        )

parse_config: DLiteExcelParserConfig instance-attribute

get()

Execute the strategy.

This method will be called through the strategy-specific endpoint of the OTE-API Services.

Returns:

Type Description
DLiteExcelSessionUpdate

DLite instance.

Source code in oteapi_dlite/strategies/parse_excel.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def get(self) -> DLiteExcelSessionUpdate:
    """Execute the strategy.

    This method will be called through the strategy-specific endpoint
    of the OTE-API Services.

    Returns:
        DLite instance.

    """
    config = self.parse_config.configuration

    if config.downloadUrl is None:
        raise ValueError("downloadUrl is required.")
    if config.mediaType is None:
        raise ValueError("mediaType is required.")

    xlsx_config = {
        "parserType": "parser/excel_xlsx",
        "configuration": config.excel_config.model_dump(),
        "entity": (
            self.parse_config.entity
            if self.parse_config.entity
            else "https://example.org"
        ),
    }
    xlsx_config["configuration"].update(
        {
            "downloadUrl": config.downloadUrl,
            "mediaType": config.mediaType,
        }
    )
    parser = create_strategy("parse", xlsx_config)
    columns: dict[str, Any] = parser.get()["data"]

    names, units = zip(*[split_column_name(column) for column in columns])
    rec = dict2recarray(columns, names=names)

    if not isinstance(units, (list, tuple)):
        # This check is to satisfy mypy for the `infer_metadata` call below.
        raise TypeError(
            f"units must be a list or tuple, instead it was {type(units)}"
        )

    meta_uri = self.parse_config.entity
    if meta_uri:
        if config.storage_path is not None:
            for storage_path in config.storage_path.split("|"):
                dlite.storage_path.append(storage_path)
        meta = dlite.get_instance(str(meta_uri))
        # check the metadata config would go here
    else:
        meta = infer_metadata(rec, units=units)

    inst = meta(dimensions=[len(rec)], id=config.id)
    for name in names:
        inst[name] = rec[name]

    # Insert inst into collection
    coll = get_collection(config.collection_id)
    coll.add(config.label, inst)

    update_collection(coll)
    return DLiteExcelSessionUpdate(
        collection_id=coll.uuid,
        inst_uuid=inst.uuid,
        label=config.label,
    )

initialize()

Initialize.

Source code in oteapi_dlite/strategies/parse_excel.py
127
128
129
130
131
132
133
def initialize(self) -> DLiteResult:
    """Initialize."""
    return DLiteResult(
        collection_id=get_collection(
            self.parse_config.configuration.collection_id
        ).uuid
    )

infer_metadata(rec, units)

Infer dlite metadata from recarray rec.

Source code in oteapi_dlite/strategies/parse_excel.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def infer_metadata(rec: np.recarray, units: tuple[str, ...]) -> dlite.Instance:
    """Infer dlite metadata from recarray `rec`."""
    rnd = getrandbits(128)
    uri = f"http://onto-ns.com/meta/1.0/generated_from_excel_{rnd:0x}"
    metadata = DataModel(
        uri,
        description="Generated datamodel from excel file.",
    )
    metadata.add_dimension("nrows", "Number of rows.")
    for i, name in enumerate(rec.dtype.names):
        dtype = rec[name].dtype
        ptype = "string" if dtype.kind == "U" else dtype.name
        metadata.add_property(name, type=ptype, shape=["nrows"], unit=units[i])
    return metadata.get()

split_column_name(column)

Split column name into a (name, unit) tuple.

Source code in oteapi_dlite/strategies/parse_excel.py
205
206
207
208
209
210
211
def split_column_name(column: str) -> tuple[str, str]:
    """Split column name into a (name, unit) tuple."""
    match = re.match(r"\s*([^ ([<]+)\s*[([<]?([^] )>]*)[])>]?", column)
    if not match:
        return column, ""
    name, unit = match.groups()
    return name, unit

parse_image

Strategy class for parsing an image to a DLite instance.

LOGGER = logging.getLogger('oteapi_dlite.strategies') module-attribute

DLiteImageConfig

Bases: ImageConfig, DLiteResult

Configuration for DLite image parser.

Source code in oteapi_dlite/strategies/parse_image.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class DLiteImageConfig(ImageConfig, DLiteResult):
    """Configuration for DLite image parser."""

    # Resource config
    mediaType: Annotated[
        Optional[
            Literal[
                "image/vnd.dlite-jpg",
                "image/vnd.dlite-jpeg",
                "image/vnd.dlite-jp2",
                "image/vnd.dlite-png",
                "image/vnd.dlite-gif",
                "image/vnd.dlite-tiff",
                "image/vnd.dlite-eps",
            ]
        ],
        Field(description=ResourceConfig.model_fields["mediaType"].description),
    ] = None

    # Parser config
    image_label: Annotated[
        str,
        Field(
            description="Label to assign to the image in the collection.",
        ),
    ] = "image"

image_label: Annotated[str, Field(description='Label to assign to the image in the collection.')] = 'image' class-attribute instance-attribute

mediaType: Annotated[Optional[Literal['image/vnd.dlite-jpg', 'image/vnd.dlite-jpeg', 'image/vnd.dlite-jp2', 'image/vnd.dlite-png', 'image/vnd.dlite-gif', 'image/vnd.dlite-tiff', 'image/vnd.dlite-eps']], Field(description=ResourceConfig.model_fields['mediaType'].description)] = None class-attribute instance-attribute

DLiteImageParseStrategy

Parse strategy for image files.

Registers strategies:

  • ("mediaType", "image/vnd.dlite-gif")
  • ("mediaType", "image/vnd.dlite-jpeg")
  • ("mediaType", "image/vnd.dlite-jpg")
  • ("mediaType", "image/vnd.dlite-jp2")
  • ("mediaType", "image/vnd.dlite-png")
  • ("mediaType", "image/vnd.dlite-tiff")
Source code in oteapi_dlite/strategies/parse_image.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@dataclass
class DLiteImageParseStrategy:
    """Parse strategy for image files.

    **Registers strategies**:

    - `("mediaType", "image/vnd.dlite-gif")`
    - `("mediaType", "image/vnd.dlite-jpeg")`
    - `("mediaType", "image/vnd.dlite-jpg")`
    - `("mediaType", "image/vnd.dlite-jp2")`
    - `("mediaType", "image/vnd.dlite-png")`
    - `("mediaType", "image/vnd.dlite-tiff")`

    """

    parse_config: DLiteImageParserConfig

    def initialize(self) -> DLiteResult:
        """Initialize."""
        return DLiteResult(
            collection_id=get_collection(
                self.parse_config.configuration.collection_id
            ).uuid
        )

    def get(self) -> DLiteResult:
        """Execute the strategy.

        This method will be called through the strategy-specific
        endpoint of the OTE-API Services.  It assumes that the image to
        parse is stored in a data cache, and can be retrieved via a key
        that is supplied in the parser configuration.

        Returns:
            Reference to a DLite collection ID.

        """
        config = self.parse_config.configuration

        if config.downloadUrl is None:
            raise ValueError("downloadUrl is required.")
        if config.mediaType is None:
            raise ValueError("mediaType is required.")

        # Configuration for ImageDataParseStrategy in oteapi-core
        core_config = {
            "parserType": "parser/image",
            "configuration": config.model_dump(),
            "entity": self.parse_config.entity,
        }
        core_config["configuration"]["mediaType"] = (
            "image/" + config.mediaType.split("-")[-1]
        )

        output = create_strategy("parse", core_config).get()

        cache = DataCache()
        data = cache.get(output["image_key"])
        if isinstance(data, bytes):
            data = np.asarray(
                Image.frombytes(
                    data=data,
                    mode=output["image_mode"],
                    size=output["image_size"],
                )
            )
        if not isinstance(data, np.ndarray):
            raise TypeError(
                "Expected image data to be a numpy array, instead it was "
                f"{type(data)}."
            )

        meta = get_meta(str(self.parse_config.entity))
        inst = meta(dimensions=data.shape)
        inst["data"] = data

        coll = get_collection(config.collection_id)
        coll.add(config.image_label, inst)

        update_collection(coll)
        return DLiteResult(collection_id=coll.uuid)

parse_config: DLiteImageParserConfig instance-attribute

get()

Execute the strategy.

This method will be called through the strategy-specific endpoint of the OTE-API Services. It assumes that the image to parse is stored in a data cache, and can be retrieved via a key that is supplied in the parser configuration.

Returns:

Type Description
DLiteResult

Reference to a DLite collection ID.

Source code in oteapi_dlite/strategies/parse_image.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def get(self) -> DLiteResult:
    """Execute the strategy.

    This method will be called through the strategy-specific
    endpoint of the OTE-API Services.  It assumes that the image to
    parse is stored in a data cache, and can be retrieved via a key
    that is supplied in the parser configuration.

    Returns:
        Reference to a DLite collection ID.

    """
    config = self.parse_config.configuration

    if config.downloadUrl is None:
        raise ValueError("downloadUrl is required.")
    if config.mediaType is None:
        raise ValueError("mediaType is required.")

    # Configuration for ImageDataParseStrategy in oteapi-core
    core_config = {
        "parserType": "parser/image",
        "configuration": config.model_dump(),
        "entity": self.parse_config.entity,
    }
    core_config["configuration"]["mediaType"] = (
        "image/" + config.mediaType.split("-")[-1]
    )

    output = create_strategy("parse", core_config).get()

    cache = DataCache()
    data = cache.get(output["image_key"])
    if isinstance(data, bytes):
        data = np.asarray(
            Image.frombytes(
                data=data,
                mode=output["image_mode"],
                size=output["image_size"],
            )
        )
    if not isinstance(data, np.ndarray):
        raise TypeError(
            "Expected image data to be a numpy array, instead it was "
            f"{type(data)}."
        )

    meta = get_meta(str(self.parse_config.entity))
    inst = meta(dimensions=data.shape)
    inst["data"] = data

    coll = get_collection(config.collection_id)
    coll.add(config.image_label, inst)

    update_collection(coll)
    return DLiteResult(collection_id=coll.uuid)

initialize()

Initialize.

Source code in oteapi_dlite/strategies/parse_image.py
107
108
109
110
111
112
113
def initialize(self) -> DLiteResult:
    """Initialize."""
    return DLiteResult(
        collection_id=get_collection(
            self.parse_config.configuration.collection_id
        ).uuid
    )

DLiteImageParserConfig

Bases: ParserConfig

Parser config for DLite image parser.

Source code in oteapi_dlite/strategies/parse_image.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class DLiteImageParserConfig(ParserConfig):
    """Parser config for DLite image parser."""

    parserType: Annotated[
        Literal["image/vnd.dlite-image"],
        Field(description=ParserConfig.model_fields["parserType"].description),
    ] = "image/vnd.dlite-image"

    configuration: Annotated[
        DLiteImageConfig,
        Field(
            description="Image parse strategy-specific configuration.",
        ),
    ] = DLiteImageConfig()

    entity: Annotated[
        AnyHttpUrl,  # Keep this type to avoid changing the original type
        Field(description=ParserConfig.model_fields["entity"].description),
    ] = AnyHttpUrl("http://onto-ns.com/meta/1.0/Image")

    @field_validator("entity", mode="after")
    @classmethod
    def _validate_entity(cls, value: AnyHttpUrl) -> AnyHttpUrl:
        """Ensure that the entity is the Image URI."""
        fixed_uri = "http://onto-ns.com/meta/1.0/Image"

        if value != AnyHttpUrl(fixed_uri):
            raise ValueError(f"Entity must be exactly equal to: {fixed_uri}")

        return value

configuration: Annotated[DLiteImageConfig, Field(description='Image parse strategy-specific configuration.')] = DLiteImageConfig() class-attribute instance-attribute

entity: Annotated[AnyHttpUrl, Field(description=ParserConfig.model_fields['entity'].description)] = AnyHttpUrl('http://onto-ns.com/meta/1.0/Image') class-attribute instance-attribute

parserType: Annotated[Literal['image/vnd.dlite-image'], Field(description=ParserConfig.model_fields['parserType'].description)] = 'image/vnd.dlite-image' class-attribute instance-attribute

serialise

Filter for serialisation using DLite.

SerialiseConfig

Bases: DLiteResult

DLite serialise-specific configurations.

Source code in oteapi_dlite/strategies/serialise.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class SerialiseConfig(DLiteResult):
    """DLite serialise-specific configurations."""

    driver: Annotated[
        str,
        Field(
            description="Name of DLite plugin used for serialisation.",
        ),
    ]
    location: Annotated[
        Path,
        Field(
            description="Path or URL to serialise to.",
        ),
    ]
    options: Annotated[
        Optional[str],
        Field(
            description="Options passed to the driver.",
        ),
    ] = ""
    labels: Annotated[
        Optional[Sequence[str]],
        Field(
            None,
            description=(
                "Optional sequence of labels in the collection to serialise.  "
                "The default is to serialise the entire collection."
            ),
        ),
    ] = None

driver: Annotated[str, Field(description='Name of DLite plugin used for serialisation.')] instance-attribute

labels: Annotated[Optional[Sequence[str]], Field(None, description='Optional sequence of labels in the collection to serialise. The default is to serialise the entire collection.')] = None class-attribute instance-attribute

location: Annotated[Path, Field(description='Path or URL to serialise to.')] instance-attribute

options: Annotated[Optional[str], Field(description='Options passed to the driver.')] = '' class-attribute instance-attribute

SerialiseFilterConfig

Bases: FilterConfig

Filter config for serialise.

Source code in oteapi_dlite/strategies/serialise.py
51
52
53
54
55
56
57
58
59
class SerialiseFilterConfig(FilterConfig):
    """Filter config for serialise."""

    configuration: Annotated[
        SerialiseConfig,
        Field(
            description="Serialise-specific configurations.",
        ),
    ]

configuration: Annotated[SerialiseConfig, Field(description='Serialise-specific configurations.')] instance-attribute

SerialiseStrategy

Filter for serialisation using DLite.

Registers strategies:

  • ("filterType", "dlite_serialise")
Source code in oteapi_dlite/strategies/serialise.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@dataclass
class SerialiseStrategy:
    """Filter for serialisation using DLite.

    **Registers strategies**:

    - `("filterType", "dlite_serialise")`

    """

    filter_config: SerialiseFilterConfig

    def initialize(self) -> DLiteResult:
        """Initialize."""
        return DLiteResult(
            collection_id=get_collection(
                self.filter_config.configuration.collection_id
            ).uuid
        )

    def get(self) -> DLiteResult:
        """Execute the strategy."""
        config = self.filter_config.configuration

        coll = get_collection(config.collection_id)

        storage = dlite.Storage(
            driver_or_url=config.driver,
            location=str(config.location),
            options=config.options,
        )
        if config.labels is None:
            coll.save_to_storage(storage)
        else:
            for label in config.labels:
                inst = coll.get(label)
                inst.save_to_storage(storage)

        update_collection(coll)
        return DLiteResult(collection_id=coll.uuid)

filter_config: SerialiseFilterConfig instance-attribute

get()

Execute the strategy.

Source code in oteapi_dlite/strategies/serialise.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get(self) -> DLiteResult:
    """Execute the strategy."""
    config = self.filter_config.configuration

    coll = get_collection(config.collection_id)

    storage = dlite.Storage(
        driver_or_url=config.driver,
        location=str(config.location),
        options=config.options,
    )
    if config.labels is None:
        coll.save_to_storage(storage)
    else:
        for label in config.labels:
            inst = coll.get(label)
            inst.save_to_storage(storage)

    update_collection(coll)
    return DLiteResult(collection_id=coll.uuid)

initialize()

Initialize.

Source code in oteapi_dlite/strategies/serialise.py
74
75
76
77
78
79
80
def initialize(self) -> DLiteResult:
    """Initialize."""
    return DLiteResult(
        collection_id=get_collection(
            self.filter_config.configuration.collection_id
        ).uuid
    )

settings

Generic strategy for adding configurations to the session.

NoneType = type(None) module-attribute

SettingsConfig

Bases: AttrDict

Configuration for a generic "settings" filter.

This strategy stores settings in the session such that they are available for other strategies. For this to work, this strategy should be added to the end of the pipeline (since it uses the initiate() method).

The settings are stored as a JSON string which can be accessed by its label.

Source code in oteapi_dlite/strategies/settings.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class SettingsConfig(AttrDict):
    """Configuration for a generic "settings" filter.

    This strategy stores settings in the session such that they are
    available for other strategies.  For this to work, this strategy
    should be added to the end of the pipeline (since it uses the
    `initiate()` method).

    The settings are stored as a JSON string which can be accessed
    by its label.

    """

    label: Annotated[
        str,
        Field(
            description=(
                "Label for accessing this configuration.  "
                "It should be unique."
            ),
        ),
    ]
    settings: Annotated[
        JsonValue,
        Field(
            description=(
                "The configurations to be stored, represented as a Python "
                "object that can be serialised to JSON."
            ),
        ),
    ]

label: Annotated[str, Field(description='Label for accessing this configuration. It should be unique.')] instance-attribute

settings: Annotated[JsonValue, Field(description='The configurations to be stored, represented as a Python object that can be serialised to JSON.')] instance-attribute

SettingsFilterConfig

Bases: FilterConfig

Settings strategy config.

Source code in oteapi_dlite/strategies/settings.py
48
49
50
51
52
53
54
class SettingsFilterConfig(FilterConfig):
    """Settings strategy config."""

    configuration: Annotated[
        SettingsConfig,
        Field(description="Settings strategy-specific configuration."),
    ]

configuration: Annotated[SettingsConfig, Field(description='Settings strategy-specific configuration.')] instance-attribute

SettingsStrategy

Generic settings strategy for storing settings for other strategies.

Registers strategies:

  • ("mediaType", "application/vnd.dlite-settings")
Source code in oteapi_dlite/strategies/settings.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@dataclass
class SettingsStrategy:
    """Generic settings strategy for storing settings for other
    strategies.

    **Registers strategies**:

    - `("mediaType", "application/vnd.dlite-settings")`

    """

    filter_config: SettingsFilterConfig

    def initialize(self) -> AttrDict:
        """Store settings."""
        config = self.filter_config.configuration

        return AttrDict(dlite_settings={config.label: config.settings})

    def get(self) -> AttrDict:
        """Do nothing."""
        return AttrDict()

filter_config: SettingsFilterConfig instance-attribute

get()

Do nothing.

Source code in oteapi_dlite/strategies/settings.py
76
77
78
def get(self) -> AttrDict:
    """Do nothing."""
    return AttrDict()

initialize()

Store settings.

Source code in oteapi_dlite/strategies/settings.py
70
71
72
73
74
def initialize(self) -> AttrDict:
    """Store settings."""
    config = self.filter_config.configuration

    return AttrDict(dlite_settings={config.label: config.settings})