Skip to content

mapping

Mapping filter strategy.

DLiteMappingConfig

Bases: MappingConfig

DLite mapping strategy config.

Source code in oteapi_dlite/strategies/mapping.py
34
35
36
37
38
39
40
41
42
class DLiteMappingConfig(MappingConfig):
    """DLite mapping strategy config."""

    configuration: Annotated[
        DLiteMappingStrategyConfig,
        Field(
            description="DLite mapping strategy-specific configuration.",
        ),
    ] = DLiteMappingStrategyConfig()

configuration: Annotated[DLiteMappingStrategyConfig, Field(description='DLite mapping strategy-specific configuration.')] = DLiteMappingStrategyConfig() class-attribute instance-attribute

DLiteMappingStrategy

Strategy for a mapping.

Registers strategies:

  • ("mappingType", "mappings")
Source code in oteapi_dlite/strategies/mapping.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@dataclass
class DLiteMappingStrategy:
    """Strategy for a mapping.

    **Registers strategies**:

    - `("mappingType", "mappings")`

    """

    mapping_config: DLiteMappingConfig

    def initialize(self) -> DLiteResult:
        """Initialize strategy."""
        config = self.mapping_config.configuration

        coll = get_collection(config.collection_id)

        kb_settings = config.dlite_settings.get("tripper.triplestore")
        if isinstance(kb_settings, str):
            kb_settings = json.loads(kb_settings)
        if kb_settings and not isinstance(kb_settings, dict):
            raise ValueError(
                "The `tripper.triplestore` setting must be a dictionary."
            )

        if TYPE_CHECKING:  # pragma: no cover
            # This block will only be run by mypy when checking typing
            assert isinstance(kb_settings, dict) or kb_settings is None  # nosec

        ts = get_triplestore(kb_settings=kb_settings, collection_id=coll.uuid)

        if self.mapping_config.prefixes:
            for prefix, iri in self.mapping_config.prefixes.items():
                ts.bind(prefix, iri)

        if self.mapping_config.triples:
            ts.add_triples(
                [
                    [
                        ts.expand_iri(t) if isinstance(t, str) else t
                        for t in triple
                    ]
                    for triple in self.mapping_config.triples
                ]
            )

        update_collection(coll)
        return DLiteResult(collection_id=coll.uuid)

    def get(self) -> DLiteResult:
        """Execute strategy and return a dictionary."""
        return DLiteResult(
            collection_id=get_collection(
                self.mapping_config.configuration.collection_id
            ).uuid
        )

mapping_config: DLiteMappingConfig instance-attribute

get()

Execute strategy and return a dictionary.

Source code in oteapi_dlite/strategies/mapping.py
 95
 96
 97
 98
 99
100
101
def get(self) -> DLiteResult:
    """Execute strategy and return a dictionary."""
    return DLiteResult(
        collection_id=get_collection(
            self.mapping_config.configuration.collection_id
        ).uuid
    )

initialize()

Initialize strategy.

Source code in oteapi_dlite/strategies/mapping.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def initialize(self) -> DLiteResult:
    """Initialize strategy."""
    config = self.mapping_config.configuration

    coll = get_collection(config.collection_id)

    kb_settings = config.dlite_settings.get("tripper.triplestore")
    if isinstance(kb_settings, str):
        kb_settings = json.loads(kb_settings)
    if kb_settings and not isinstance(kb_settings, dict):
        raise ValueError(
            "The `tripper.triplestore` setting must be a dictionary."
        )

    if TYPE_CHECKING:  # pragma: no cover
        # This block will only be run by mypy when checking typing
        assert isinstance(kb_settings, dict) or kb_settings is None  # nosec

    ts = get_triplestore(kb_settings=kb_settings, collection_id=coll.uuid)

    if self.mapping_config.prefixes:
        for prefix, iri in self.mapping_config.prefixes.items():
            ts.bind(prefix, iri)

    if self.mapping_config.triples:
        ts.add_triples(
            [
                [
                    ts.expand_iri(t) if isinstance(t, str) else t
                    for t in triple
                ]
                for triple in self.mapping_config.triples
            ]
        )

    update_collection(coll)
    return DLiteResult(collection_id=coll.uuid)

DLiteMappingStrategyConfig

Bases: DLiteConfiguration

Configuration for a DLite mapping filter.

Source code in oteapi_dlite/strategies/mapping.py
23
24
25
26
27
28
29
30
31
class DLiteMappingStrategyConfig(DLiteConfiguration):
    """Configuration for a DLite mapping filter."""

    datamodel: Annotated[
        Optional[AnyUrl],
        Field(
            description="URI of the datamodel that is mapped.",
        ),
    ] = None

datamodel: Annotated[Optional[AnyUrl], Field(description='URI of the datamodel that is mapped.')] = None class-attribute instance-attribute