Skip to content

parse

Generic parse strategy using DLite storage plugin.

DLiteParseConfig

Bases: AttrDict

Configuration for generic DLite parser.

Source code in oteapi_dlite/strategies/parse.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DLiteParseConfig(AttrDict):
    """Configuration for generic DLite parser."""

    driver: Annotated[
        str,
        Field(
            description='Name of DLite driver (ex: "json").',
        ),
    ]
    location: Annotated[
        Optional[str],
        Field(
            description=(
                "Explicit location of storage.  Normally data is read from the "
                "data cache using `datacache_config.accessKey` (default: "
                "'key')."
            ),
        ),
    ] = None
    options: Annotated[
        Optional[str],
        Field(
            description=(
                "Comma-separated list of options passed to the DLite storage "
                "plugin."
            ),
        ),
    ] = None
    id: Annotated[
        Optional[str],
        Field(
            description="If given, the id of the instance in the storage.",
        ),
    ] = None
    label: Annotated[
        Optional[str],
        Field(
            description=(
                "Optional label of the new DLite instance in the collection."
            ),
        ),
    ] = None
    datamodel: Annotated[
        Optional[str],
        Field(
            description=(
                "DLite datamodel documenting the structure of the data set. "
                "Often unused, since the datamodel is implicitly defined in "
                "the DLite driver (DLite plugin), but for a documentation "
                "point of view this is a very important field."
            ),
        ),
    ] = None
    datacache_config: Annotated[
        Optional[DataCacheConfig],
        Field(
            description="Configuration options for the local data cache.",
        ),
    ] = None

datacache_config: Annotated[Optional[DataCacheConfig], Field(description='Configuration options for the local data cache.')] = None class-attribute instance-attribute

datamodel: Annotated[Optional[str], Field(description='DLite datamodel documenting the structure of the data set. Often unused, since the datamodel is implicitly defined in the DLite driver (DLite plugin), but for a documentation point of view this is a very important field.')] = None class-attribute instance-attribute

driver: Annotated[str, Field(description='Name of DLite driver (ex: "json").')] instance-attribute

id: Annotated[Optional[str], Field(description='If given, the id of the instance in the storage.')] = None class-attribute instance-attribute

label: Annotated[Optional[str], Field(description='Optional label of the new DLite instance in the collection.')] = None class-attribute instance-attribute

location: Annotated[Optional[str], Field(description="Explicit location of storage. Normally data is read from the data cache using `datacache_config.accessKey` (default: 'key').")] = None class-attribute instance-attribute

options: Annotated[Optional[str], Field(description='Comma-separated list of options passed to the DLite storage plugin.')] = None class-attribute instance-attribute

DLiteParseResourceConfig

Bases: ResourceConfig

DLite parse strategy resource config.

Source code in oteapi_dlite/strategies/parse.py
82
83
84
85
86
87
88
class DLiteParseResourceConfig(ResourceConfig):
    """DLite parse strategy resource config."""

    configuration: Annotated[
        DLiteParseConfig,
        Field(description="DLite parse strategy-specific configuration."),
    ]

configuration: Annotated[DLiteParseConfig, Field(description='DLite parse strategy-specific configuration.')] instance-attribute

DLiteParseStrategy

Generic DLite parse strategy utilising DLite storage plugins.

Registers strategies:

  • ("mediaType", "application/vnd.dlite-parse")
Source code in oteapi_dlite/strategies/parse.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@dataclass
class DLiteParseStrategy:
    """Generic DLite parse strategy utilising DLite storage plugins.

    **Registers strategies**:

    - `("mediaType", "application/vnd.dlite-parse")`

    """

    parse_config: DLiteParseResourceConfig

    def initialize(
        self,
        session: Optional[dict[str, Any]] = None,
    ) -> DLiteSessionUpdate:
        """Initialize."""
        return DLiteSessionUpdate(collection_id=get_collection(session).uuid)

    def get(
        self, session: Optional[dict[str, Any]] = None
    ) -> DLiteSessionUpdate:
        """Execute the strategy.

        This method will be called through the strategy-specific endpoint
        of the OTE-API Services.

        Parameters:
            session: A session-specific dictionary context.

        Returns:
            SessionUpdate instance.
        """
        config = self.parse_config.configuration
        cacheconfig = config.datacache_config

        driver = (
            config.driver
            if config.driver
            else get_driver(
                mediaType=self.parse_config.mediaType,
            )
        )

        # Create instance
        if config.location:
            inst = dlite.Instance.from_location(
                driver=driver,
                location=config.location,
                options=config.options,
                id=config.id,
            )
        else:
            if cacheconfig and cacheconfig.accessKey:
                key = cacheconfig.accessKey
            elif session and "key" in session:
                key = session["key"]
            else:
                raise ValueError(
                    "either `location` or `datacache_config.accessKey` must be "
                    "provided"
                )

            # See if we can extract file suffix from downloadUrl
            if self.parse_config.downloadUrl:
                suffix = Path(str(self.parse_config.downloadUrl)).suffix
            else:
                suffix = None

            cache = DataCache()
            with cache.getfile(key, suffix=suffix) as location:
                inst = dlite.Instance.from_location(
                    driver=driver,
                    location=str(location),
                    options=config.options,
                    id=config.id,
                )

        # Insert inst into collection
        coll = get_collection(session)
        label = config.label if config.label else inst.uuid
        coll.add(label, inst)

        # __TODO__
        # See
        # https://github.com/EMMC-ASBL/oteapi-dlite/pull/84#discussion_r1050437185
        # and following comments.
        #
        # Since we cannot safely assume that all strategies in a
        # pipeline will be executed in the same Python interpreter,
        # the collection should be written to a storage, such that it
        # can be shared with the other strategies.

        update_collection(coll)
        return DLiteSessionUpdate(collection_id=coll.uuid)

parse_config: DLiteParseResourceConfig instance-attribute

get(session=None)

Execute the strategy.

This method will be called through the strategy-specific endpoint of the OTE-API Services.

Parameters:

Name Type Description Default
session Optional[dict[str, Any]]

A session-specific dictionary context.

None

Returns:

Type Description
DLiteSessionUpdate

SessionUpdate instance.

Source code in oteapi_dlite/strategies/parse.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def get(
    self, session: Optional[dict[str, Any]] = None
) -> DLiteSessionUpdate:
    """Execute the strategy.

    This method will be called through the strategy-specific endpoint
    of the OTE-API Services.

    Parameters:
        session: A session-specific dictionary context.

    Returns:
        SessionUpdate instance.
    """
    config = self.parse_config.configuration
    cacheconfig = config.datacache_config

    driver = (
        config.driver
        if config.driver
        else get_driver(
            mediaType=self.parse_config.mediaType,
        )
    )

    # Create instance
    if config.location:
        inst = dlite.Instance.from_location(
            driver=driver,
            location=config.location,
            options=config.options,
            id=config.id,
        )
    else:
        if cacheconfig and cacheconfig.accessKey:
            key = cacheconfig.accessKey
        elif session and "key" in session:
            key = session["key"]
        else:
            raise ValueError(
                "either `location` or `datacache_config.accessKey` must be "
                "provided"
            )

        # See if we can extract file suffix from downloadUrl
        if self.parse_config.downloadUrl:
            suffix = Path(str(self.parse_config.downloadUrl)).suffix
        else:
            suffix = None

        cache = DataCache()
        with cache.getfile(key, suffix=suffix) as location:
            inst = dlite.Instance.from_location(
                driver=driver,
                location=str(location),
                options=config.options,
                id=config.id,
            )

    # Insert inst into collection
    coll = get_collection(session)
    label = config.label if config.label else inst.uuid
    coll.add(label, inst)

    # __TODO__
    # See
    # https://github.com/EMMC-ASBL/oteapi-dlite/pull/84#discussion_r1050437185
    # and following comments.
    #
    # Since we cannot safely assume that all strategies in a
    # pipeline will be executed in the same Python interpreter,
    # the collection should be written to a storage, such that it
    # can be shared with the other strategies.

    update_collection(coll)
    return DLiteSessionUpdate(collection_id=coll.uuid)

initialize(session=None)

Initialize.

Source code in oteapi_dlite/strategies/parse.py
103
104
105
106
107
108
def initialize(
    self,
    session: Optional[dict[str, Any]] = None,
) -> DLiteSessionUpdate:
    """Initialize."""
    return DLiteSessionUpdate(collection_id=get_collection(session).uuid)