Skip to content

OTE-API Core Strategies

This page provides documentation for the oteapi.strategies submodule, where all the core OTE-API strategies are located.

These strategies will always be available when setting up a server based on the OTE-API Core package.

download

file

Download strategy class for the file scheme.

DownloadFileContent

Bases: AttrDict

Class for returning values from Download File strategy.

Source code in oteapi/strategies/download/file.py
59
60
61
62
class DownloadFileContent(AttrDict):
    """Class for returning values from Download File strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str = Field(..., description='Key to access the data in the cache.') class-attribute instance-attribute

FileConfig

Bases: AttrDict

File-specific Configuration Data Model.

Source code in oteapi/strategies/download/file.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FileConfig(AttrDict):
    """File-specific Configuration Data Model."""

    text: bool = Field(
        False,
        description=(
            "Whether the file should be opened in text mode. If `False`, the file will"
            " be opened in bytes mode."
        ),
    )
    encoding: Optional[str] = Field(
        None,
        description=(
            "Encoding used when opening the file. The default is platform dependent."
        ),
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description=(
            "Configurations for the data cache for storing the downloaded file "
            "content."
        ),
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute
encoding: Optional[str] = Field(None, description='Encoding used when opening the file. The default is platform dependent.') class-attribute instance-attribute
text: bool = Field(False, description='Whether the file should be opened in text mode. If `False`, the file will be opened in bytes mode.') class-attribute instance-attribute

FileResourceConfig

Bases: ResourceConfig

File download strategy filter config.

Source code in oteapi/strategies/download/file.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class FileResourceConfig(ResourceConfig):
    """File download strategy filter config."""

    downloadUrl: FileUrl = Field(  # type: ignore[assignment]
        ..., description="The file URL, which will be downloaded."
    )
    configuration: FileConfig = Field(
        FileConfig(), description="File download strategy-specific configuration."
    )

    @field_validator("downloadUrl")
    @classmethod
    def ensure_path_exists(cls, value: FileUrl) -> FileUrl:
        """Ensure `path` is defined in `downloadUrl`."""
        if not value.path:
            raise ValueError("downloadUrl must contain a `path` part.")
        return value
configuration: FileConfig = Field(FileConfig(), description='File download strategy-specific configuration.') class-attribute instance-attribute
downloadUrl: FileUrl = Field(..., description='The file URL, which will be downloaded.') class-attribute instance-attribute
ensure_path_exists(value) classmethod

Ensure path is defined in downloadUrl.

Source code in oteapi/strategies/download/file.py
50
51
52
53
54
55
56
@field_validator("downloadUrl")
@classmethod
def ensure_path_exists(cls, value: FileUrl) -> FileUrl:
    """Ensure `path` is defined in `downloadUrl`."""
    if not value.path:
        raise ValueError("downloadUrl must contain a `path` part.")
    return value

FileStrategy

Strategy for retrieving data from a local file.

Registers strategies:

  • ("scheme", "file")
Source code in oteapi/strategies/download/file.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@dataclass
class FileStrategy:
    """Strategy for retrieving data from a local file.

    **Registers strategies**:

    - `("scheme", "file")`

    """

    download_config: FileResourceConfig

    def initialize(self) -> AttrDict:
        """Initialize."""
        return AttrDict()

    def get(self) -> DownloadFileContent:
        """Read local file."""
        filename = uri_to_path(self.download_config.downloadUrl).resolve()

        if not filename.exists():
            raise FileNotFoundError(f"File not found at {filename}")

        cache = DataCache(self.download_config.configuration.datacache_config)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            key = cache.add(
                filename.read_text(encoding=self.download_config.configuration.encoding)
                if self.download_config.configuration.text
                else filename.read_bytes()
            )

        return DownloadFileContent(key=key)
download_config: FileResourceConfig instance-attribute
get()

Read local file.

Source code in oteapi/strategies/download/file.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def get(self) -> DownloadFileContent:
    """Read local file."""
    filename = uri_to_path(self.download_config.downloadUrl).resolve()

    if not filename.exists():
        raise FileNotFoundError(f"File not found at {filename}")

    cache = DataCache(self.download_config.configuration.datacache_config)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        key = cache.add(
            filename.read_text(encoding=self.download_config.configuration.encoding)
            if self.download_config.configuration.text
            else filename.read_bytes()
        )

    return DownloadFileContent(key=key)
initialize()

Initialize.

Source code in oteapi/strategies/download/file.py
77
78
79
def initialize(self) -> AttrDict:
    """Initialize."""
    return AttrDict()

https

Download strategy class for http/https

HTTPDownloadContent

Bases: AttrDict

Class for returning values from Download HTTPS strategy.

Source code in oteapi/strategies/download/https.py
110
111
112
113
class HTTPDownloadContent(AttrDict):
    """Class for returning values from Download HTTPS strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str = Field(..., description='Key to access the data in the cache.') class-attribute instance-attribute

HTTPSConfig

Bases: AttrDict

HTTP(S)-specific Configuration Data Model.

Source code in oteapi/strategies/download/https.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class HTTPSConfig(AttrDict):
    """HTTP(S)-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description=(
            "Configurations for the data cache for storing the downloaded file "
            "content."
        ),
    )

    http_method: Literal["GET", "POST"] = Field(
        "GET",
        description=(
            "HTTP method to use for the download request. Only GET and POST are "
            "supported."
        ),
    )

    headers: Optional[dict[str, str]] = Field(
        None,
        description="HTTP headers to be included in the download request.",
    )

    cookies: Optional[dict[str, str]] = Field(
        None,
        description="Cookies to be included in the download request.",
    )

    query_parameters: Optional[dict[str, Union[str, list[str]]]] = Field(
        None,
        description=(
            "Query parameters to be included in the download request. Note, these can "
            "be included directly in the `downloadURL` as well."
        ),
    )

    post_body: Optional[Union[dict[str, Any], list[tuple[str, Any]], bytes]] = Field(
        None,
        description=(
            "The body of the POST request. This can be a a dictionary, list of tuples "
            "or bytes. This field is mutually exclusive with `post_body_json`."
        ),
    )

    post_body_json: Optional[Any] = Field(
        None,
        description=(
            "The body of the POST request as a JSON serializable Python object. This "
            "will be serialized to JSON and sent as the body of the POST request. "
            "This field is mutually exclusive with `post_body`."
        ),
    )

    @field_validator("http_method", mode="before")
    @classmethod
    def _upper_case_http_method(cls, value: Any) -> Any:
        if isinstance(value, str):
            return value.upper()
        return value

    @model_validator(mode="after")
    def _validate_post_bodies(self) -> HTTPSConfig:
        if self.http_method == "GET" and (self.post_body or self.post_body_json):
            warnings.warn(
                "POST body is provided for a GET requests - it will be ignored.",
                stacklevel=2,
            )
            self.post_body = None
            self.post_body_json = None
        if self.post_body and self.post_body_json:
            raise ValueError(
                "Only one of post_body and post_body_json can be provided."
            )
        return self
cookies: Optional[dict[str, str]] = Field(None, description='Cookies to be included in the download request.') class-attribute instance-attribute
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute
headers: Optional[dict[str, str]] = Field(None, description='HTTP headers to be included in the download request.') class-attribute instance-attribute
http_method: Literal['GET', 'POST'] = Field('GET', description='HTTP method to use for the download request. Only GET and POST are supported.') class-attribute instance-attribute
post_body: Optional[Union[dict[str, Any], list[tuple[str, Any]], bytes]] = Field(None, description='The body of the POST request. This can be a a dictionary, list of tuples or bytes. This field is mutually exclusive with `post_body_json`.') class-attribute instance-attribute
post_body_json: Optional[Any] = Field(None, description='The body of the POST request as a JSON serializable Python object. This will be serialized to JSON and sent as the body of the POST request. This field is mutually exclusive with `post_body`.') class-attribute instance-attribute
query_parameters: Optional[dict[str, Union[str, list[str]]]] = Field(None, description='Query parameters to be included in the download request. Note, these can be included directly in the `downloadURL` as well.') class-attribute instance-attribute

HTTPSResourceConfig

Bases: ResourceConfig

HTTP(S) download strategy filter config.

Source code in oteapi/strategies/download/https.py
 99
100
101
102
103
104
105
106
107
class HTTPSResourceConfig(ResourceConfig):
    """HTTP(S) download strategy filter config."""

    downloadUrl: AnyHttpUrl = Field(  # type: ignore[assignment]
        ..., description="The HTTP(S) URL, which will be downloaded."
    )
    configuration: HTTPSConfig = Field(
        HTTPSConfig(), description="HTTP(S) download strategy-specific configuration."
    )
configuration: HTTPSConfig = Field(HTTPSConfig(), description='HTTP(S) download strategy-specific configuration.') class-attribute instance-attribute
downloadUrl: AnyHttpUrl = Field(..., description='The HTTP(S) URL, which will be downloaded.') class-attribute instance-attribute

HTTPSStrategy

Strategy for retrieving data via http.

Registers strategies:

  • ("scheme", "http")
  • ("scheme", "https")
Source code in oteapi/strategies/download/https.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@dataclass
class HTTPSStrategy:
    """Strategy for retrieving data via http.

    **Registers strategies**:

    - `("scheme", "http")`
    - `("scheme", "https")`

    """

    download_config: HTTPSResourceConfig

    def initialize(self) -> AttrDict:
        """Initialize."""
        return AttrDict()

    def get(self) -> HTTPDownloadContent:
        """Download via http/https and store on local cache."""
        cache = DataCache(self.download_config.configuration.datacache_config)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            req = requests.request(
                method=self.download_config.configuration.http_method,
                url=str(self.download_config.downloadUrl),
                allow_redirects=True,
                timeout=(3, 27),  # timeout: (connect, read) in seconds
                headers=self.download_config.configuration.headers,
                cookies=self.download_config.configuration.cookies,
                params=self.download_config.configuration.query_parameters,
                # No reason to check the method is correct for sending content (POST),
                # since this is validated in the config model.
                data=self.download_config.configuration.post_body,
                json=self.download_config.configuration.post_body_json,
            )
            key = cache.add(req.content)

        return HTTPDownloadContent(key=key)
download_config: HTTPSResourceConfig instance-attribute
get()

Download via http/https and store on local cache.

Source code in oteapi/strategies/download/https.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def get(self) -> HTTPDownloadContent:
    """Download via http/https and store on local cache."""
    cache = DataCache(self.download_config.configuration.datacache_config)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        req = requests.request(
            method=self.download_config.configuration.http_method,
            url=str(self.download_config.downloadUrl),
            allow_redirects=True,
            timeout=(3, 27),  # timeout: (connect, read) in seconds
            headers=self.download_config.configuration.headers,
            cookies=self.download_config.configuration.cookies,
            params=self.download_config.configuration.query_parameters,
            # No reason to check the method is correct for sending content (POST),
            # since this is validated in the config model.
            data=self.download_config.configuration.post_body,
            json=self.download_config.configuration.post_body_json,
        )
        key = cache.add(req.content)

    return HTTPDownloadContent(key=key)
initialize()

Initialize.

Source code in oteapi/strategies/download/https.py
129
130
131
def initialize(self) -> AttrDict:
    """Initialize."""
    return AttrDict()

sftp

Strategy class for sftp/ftp

AnyFtpUrl = Annotated[Url, UrlConstraints(allowed_schemes=['ftp', 'sftp'])] module-attribute

SFTPConfig

Bases: AttrDict

(S)FTP-specific Configuration Data Model.

Source code in oteapi/strategies/download/sftp.py
20
21
22
23
24
25
26
27
28
29
class SFTPConfig(AttrDict):
    """(S)FTP-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description=(
            "Configurations for the data cache for storing the downloaded file "
            "content."
        ),
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute

SFTPContent

Bases: AttrDict

Class for returning values from Download SFTP strategy.

Source code in oteapi/strategies/download/sftp.py
43
44
45
46
class SFTPContent(AttrDict):
    """Class for returning values from Download SFTP strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str = Field(..., description='Key to access the data in the cache.') class-attribute instance-attribute

SFTPResourceConfig

Bases: ResourceConfig

(S)FTP download strategy filter config.

Source code in oteapi/strategies/download/sftp.py
32
33
34
35
36
37
38
39
40
class SFTPResourceConfig(ResourceConfig):
    """(S)FTP download strategy filter config."""

    downloadUrl: AnyFtpUrl = Field(  # type: ignore[assignment]
        ..., description="The (S)FTP URL, which will be downloaded."
    )
    configuration: SFTPConfig = Field(
        SFTPConfig(), description="(S)FTP download strategy-specific configuration."
    )
configuration: SFTPConfig = Field(SFTPConfig(), description='(S)FTP download strategy-specific configuration.') class-attribute instance-attribute
downloadUrl: AnyFtpUrl = Field(..., description='The (S)FTP URL, which will be downloaded.') class-attribute instance-attribute

SFTPStrategy

Strategy for retrieving data via sftp.

Registers strategies:

  • ("scheme", "ftp")
  • ("scheme", "sftp")
Source code in oteapi/strategies/download/sftp.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@dataclass
class SFTPStrategy:
    """Strategy for retrieving data via sftp.

    **Registers strategies**:

    - `("scheme", "ftp")`
    - `("scheme", "sftp")`

    """

    download_config: SFTPResourceConfig

    def initialize(self) -> AttrDict:
        """Initialize."""
        return AttrDict()

    def get(self) -> SFTPContent:
        """Download via sftp"""
        cache = DataCache(self.download_config.configuration.datacache_config)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            # Setup connection options
            cnopts = pysftp.CnOpts()
            cnopts.hostkeys = None

            # open connection and store data locally
            with pysftp.Connection(
                host=self.download_config.downloadUrl.host,
                username=self.download_config.downloadUrl.username,
                password=self.download_config.downloadUrl.password,
                port=self.download_config.downloadUrl.port,
                cnopts=cnopts,
            ) as sftp:
                # Because of insane locking on Windows, we have to close
                # the downloaded file before adding it to the cache
                with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                    localpath = Path(handle.name).resolve()
                try:
                    sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
                    key = cache.add(localpath.read_bytes())
                finally:
                    localpath.unlink()

        return SFTPContent(key=key)
download_config: SFTPResourceConfig instance-attribute
get()

Download via sftp

Source code in oteapi/strategies/download/sftp.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def get(self) -> SFTPContent:
    """Download via sftp"""
    cache = DataCache(self.download_config.configuration.datacache_config)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        # Setup connection options
        cnopts = pysftp.CnOpts()
        cnopts.hostkeys = None

        # open connection and store data locally
        with pysftp.Connection(
            host=self.download_config.downloadUrl.host,
            username=self.download_config.downloadUrl.username,
            password=self.download_config.downloadUrl.password,
            port=self.download_config.downloadUrl.port,
            cnopts=cnopts,
        ) as sftp:
            # Because of insane locking on Windows, we have to close
            # the downloaded file before adding it to the cache
            with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                localpath = Path(handle.name).resolve()
            try:
                sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
                key = cache.add(localpath.read_bytes())
            finally:
                localpath.unlink()

    return SFTPContent(key=key)
initialize()

Initialize.

Source code in oteapi/strategies/download/sftp.py
62
63
64
def initialize(self) -> AttrDict:
    """Initialize."""
    return AttrDict()

filter

crop_filter

Demo-filter strategy

CropFilterContent

Bases: AttrDict

Return model for CropImageFilter.

Source code in oteapi/strategies/filter/crop_filter.py
39
40
41
42
43
44
class CropFilterContent(AttrDict):
    """Return model for `CropImageFilter`."""

    imagecrop: tuple[int, int, int, int] = Field(
        ..., description="Box cropping parameters (left, top, right, bottom)."
    )
imagecrop: tuple[int, int, int, int] = Field(..., description='Box cropping parameters (left, top, right, bottom).') class-attribute instance-attribute

CropImageConfig

Bases: AttrDict

Configuration model for crop data.

Source code in oteapi/strategies/filter/crop_filter.py
19
20
21
22
23
24
class CropImageConfig(AttrDict):
    """Configuration model for crop data."""

    crop: Optional[tuple[int, int, int, int]] = Field(
        None, description="Box cropping parameters (left, top, right, bottom)."
    )
crop: Optional[tuple[int, int, int, int]] = Field(None, description='Box cropping parameters (left, top, right, bottom).') class-attribute instance-attribute

CropImageFilter

Strategy for cropping an image.

Registers strategies:

  • ("filterType", "filter/crop")
Source code in oteapi/strategies/filter/crop_filter.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@dataclass
class CropImageFilter:
    """Strategy for cropping an image.

    **Registers strategies**:

    - `("filterType", "filter/crop")`

    """

    filter_config: CropImageFilterConfig

    def initialize(self) -> CropFilterContent:
        """Initialize strategy and return a dictionary."""
        if self.filter_config.configuration.crop is None:
            raise ValueError("Crop filter requires crop configuration.")

        return CropFilterContent(
            imagecrop=self.filter_config.configuration.crop,
        )

    def get(self) -> AttrDict:
        """Execute strategy and return a dictionary"""
        return AttrDict()
filter_config: CropImageFilterConfig instance-attribute
get()

Execute strategy and return a dictionary

Source code in oteapi/strategies/filter/crop_filter.py
68
69
70
def get(self) -> AttrDict:
    """Execute strategy and return a dictionary"""
    return AttrDict()
initialize()

Initialize strategy and return a dictionary.

Source code in oteapi/strategies/filter/crop_filter.py
59
60
61
62
63
64
65
66
def initialize(self) -> CropFilterContent:
    """Initialize strategy and return a dictionary."""
    if self.filter_config.configuration.crop is None:
        raise ValueError("Crop filter requires crop configuration.")

    return CropFilterContent(
        imagecrop=self.filter_config.configuration.crop,
    )

CropImageFilterConfig

Bases: FilterConfig

Crop filter strategy filter config.

Source code in oteapi/strategies/filter/crop_filter.py
27
28
29
30
31
32
33
34
35
36
class CropImageFilterConfig(FilterConfig):
    """Crop filter strategy filter config."""

    filterType: Literal["filter/crop"] = Field(
        "filter/crop",
        description=FilterConfig.model_fields["filterType"].description,
    )
    configuration: CropImageConfig = Field(
        ..., description="Image crop filter strategy-specific configuration."
    )
configuration: CropImageConfig = Field(..., description='Image crop filter strategy-specific configuration.') class-attribute instance-attribute
filterType: Literal['filter/crop'] = Field('filter/crop', description=FilterConfig.model_fields['filterType'].description) class-attribute instance-attribute

sql_query_filter

SQL query filter strategy.

SQLQueryFilter

Strategy for a SQL query filter.

Registers strategies:

  • ("filterType", "filter/sql")
Source code in oteapi/strategies/filter/sql_query_filter.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass
class SQLQueryFilter:
    """Strategy for a SQL query filter.

    **Registers strategies**:

    - `("filterType", "filter/sql")`

    """

    filter_config: SqlQueryFilterConfig

    def initialize(self) -> SqlQueryContent:
        """Initialize strategy."""
        return SqlQueryContent(sqlquery=self.filter_config.query)

    def get(self) -> AttrDict:
        """Execute strategy and return a dictionary."""
        return AttrDict()
filter_config: SqlQueryFilterConfig instance-attribute
get()

Execute strategy and return a dictionary.

Source code in oteapi/strategies/filter/sql_query_filter.py
50
51
52
def get(self) -> AttrDict:
    """Execute strategy and return a dictionary."""
    return AttrDict()
initialize()

Initialize strategy.

Source code in oteapi/strategies/filter/sql_query_filter.py
46
47
48
def initialize(self) -> SqlQueryContent:
    """Initialize strategy."""
    return SqlQueryContent(sqlquery=self.filter_config.query)

SqlQueryContent

Bases: AttrDict

Class for returning values from SQL Query data model.

Source code in oteapi/strategies/filter/sql_query_filter.py
28
29
30
31
class SqlQueryContent(AttrDict):
    """Class for returning values from SQL Query data model."""

    sqlquery: str = Field(..., description="A SQL query string.")
sqlquery: str = Field(..., description='A SQL query string.') class-attribute instance-attribute

SqlQueryFilterConfig

Bases: FilterConfig

SQL query filter strategy filter config.

Source code in oteapi/strategies/filter/sql_query_filter.py
18
19
20
21
22
23
24
25
class SqlQueryFilterConfig(FilterConfig):
    """SQL query filter strategy filter config."""

    filterType: Literal["filter/sql"] = Field(
        "filter/sql",
        description=FilterConfig.model_fields["filterType"].description,
    )
    query: str = Field(..., description="A SQL query string.")
filterType: Literal['filter/sql'] = Field('filter/sql', description=FilterConfig.model_fields['filterType'].description) class-attribute instance-attribute
query: str = Field(..., description='A SQL query string.') class-attribute instance-attribute

mapping

mapping

Mapping filter strategy.

MappingStrategy

Strategy for a mapping.

The mapping strategy simply adds more prefixes and triples to the prefixes and triples fields in the session such that they are available for other strategies, like function strategies that convert between data models.

Nothing is returned to avoid deleting existing mappings.

Registers strategies:

  • ("mappingType", "triples")
Source code in oteapi/strategies/mapping/mapping.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass
class MappingStrategy:
    """Strategy for a mapping.

    The mapping strategy simply adds more prefixes and triples to the
    `prefixes` and `triples` fields in the session such that they are
    available for other strategies, like function strategies that convert
    between data models.

    Nothing is returned to avoid deleting existing mappings.

    **Registers strategies**:

    - `("mappingType", "triples")`

    """

    mapping_config: MappingConfig

    def initialize(self) -> MappingStrategyConfig:
        """Initialize strategy."""

        return MappingStrategyConfig(
            prefixes=self.mapping_config.prefixes, triples=self.mapping_config.triples
        )

    def get(self) -> AttrDict:
        """Execute strategy and return a dictionary."""
        return AttrDict()
mapping_config: MappingConfig instance-attribute
get()

Execute strategy and return a dictionary.

Source code in oteapi/strategies/mapping/mapping.py
52
53
54
def get(self) -> AttrDict:
    """Execute strategy and return a dictionary."""
    return AttrDict()
initialize()

Initialize strategy.

Source code in oteapi/strategies/mapping/mapping.py
45
46
47
48
49
50
def initialize(self) -> MappingStrategyConfig:
    """Initialize strategy."""

    return MappingStrategyConfig(
        prefixes=self.mapping_config.prefixes, triples=self.mapping_config.triples
    )

MappingStrategyConfig

Bases: AttrDict

AttrDict model for mappings.

Source code in oteapi/strategies/mapping/mapping.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class MappingStrategyConfig(AttrDict):
    """AttrDict model for mappings."""

    prefixes: dict[str, str] = Field(
        ...,
        description=(
            "Dictionary of shortnames that expands to an IRI "
            "given as local value/IRI-expansion-pairs."
        ),
    )
    triples: list[RDFTriple] = Field(
        ...,
        description="List of semantic triples given as (subject, predicate, object).",
    )
prefixes: dict[str, str] = Field(..., description='Dictionary of shortnames that expands to an IRI given as local value/IRI-expansion-pairs.') class-attribute instance-attribute
triples: list[RDFTriple] = Field(..., description='List of semantic triples given as (subject, predicate, object).') class-attribute instance-attribute

parse

application_json

Strategy class for application/json.

JSONConfig

Bases: AttrDict

JSON parse-specific Configuration Data Model.

Source code in oteapi/strategies/parse/application_json.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class JSONConfig(AttrDict):
    """JSON parse-specific Configuration Data Model."""

    downloadUrl: Optional[HostlessAnyUrl] = Field(
        None, description="The HTTP(S) URL, which will be downloaded."
    )
    mediaType: Literal["application/json"] = Field(
        "application/json",
        description=("The media type"),
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description=(
            "Configurations for the data cache for storing the downloaded file "
            "content."
        ),
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute
downloadUrl: Optional[HostlessAnyUrl] = Field(None, description='The HTTP(S) URL, which will be downloaded.') class-attribute instance-attribute
mediaType: Literal['application/json'] = Field('application/json', description='The media type') class-attribute instance-attribute

JSONDataParseStrategy

Parse strategy for JSON.

Registers strategies:

  • ("parserType", "parser/json")
Source code in oteapi/strategies/parse/application_json.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@dataclass
class JSONDataParseStrategy:
    """Parse strategy for JSON.

    **Registers strategies**:

    - `("parserType", "parser/json")`

    """

    parse_config: JSONParserConfig

    def initialize(self) -> AttrDict:
        """Initialize."""
        return AttrDict()

    def get(self) -> JSONParseContent:
        """Parse json."""
        downloader = create_strategy(
            "download", self.parse_config.configuration.model_dump()
        )
        output = downloader.get()
        cache = DataCache(self.parse_config.configuration.datacache_config)
        content = cache.get(output["key"])

        if isinstance(content, dict):
            return JSONParseContent(content=content)
        return JSONParseContent(content=json.loads(content))
parse_config: JSONParserConfig instance-attribute
get()

Parse json.

Source code in oteapi/strategies/parse/application_json.py
75
76
77
78
79
80
81
82
83
84
85
86
def get(self) -> JSONParseContent:
    """Parse json."""
    downloader = create_strategy(
        "download", self.parse_config.configuration.model_dump()
    )
    output = downloader.get()
    cache = DataCache(self.parse_config.configuration.datacache_config)
    content = cache.get(output["key"])

    if isinstance(content, dict):
        return JSONParseContent(content=content)
    return JSONParseContent(content=json.loads(content))
initialize()

Initialize.

Source code in oteapi/strategies/parse/application_json.py
71
72
73
def initialize(self) -> AttrDict:
    """Initialize."""
    return AttrDict()

JSONParseContent

Bases: AttrDict

Class for returning values from JSON Parse.

Source code in oteapi/strategies/parse/application_json.py
53
54
55
56
class JSONParseContent(AttrDict):
    """Class for returning values from JSON Parse."""

    content: dict = Field(..., description="Content of the JSON document.")
content: dict = Field(..., description='Content of the JSON document.') class-attribute instance-attribute

JSONParserConfig

Bases: ParserConfig

JSON parse strategy filter config.

Source code in oteapi/strategies/parse/application_json.py
41
42
43
44
45
46
47
48
49
50
class JSONParserConfig(ParserConfig):
    """JSON parse strategy filter config."""

    parserType: Literal["parser/json"] = Field(
        "parser/json",
        description=ParserConfig.model_fields["parserType"].description,
    )
    configuration: JSONConfig = Field(
        ..., description="JSON parse strategy-specific configuration."
    )
configuration: JSONConfig = Field(..., description='JSON parse strategy-specific configuration.') class-attribute instance-attribute
parserType: Literal['parser/json'] = Field('parser/json', description=ParserConfig.model_fields['parserType'].description) class-attribute instance-attribute

application_vnd_sqlite

Strategy class for application/vnd.sqlite3.

SqLiteParseContent

Bases: AttrDict

Configuration model for SqLiteParse.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
82
83
84
85
class SqLiteParseContent(AttrDict):
    """Configuration model for SqLiteParse."""

    result: list = Field(..., description="List of results from the query.")
result: list = Field(..., description='List of results from the query.') class-attribute instance-attribute

SqliteConfig

Bases: AttrDict

Configuration data model for SqliteParseStrategy.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class SqliteConfig(AttrDict):
    """Configuration data model for
    [`SqliteParseStrategy`][oteapi.strategies.parse.application_vnd_sqlite.SqliteParseStrategy].
    """

    # Resource config
    downloadUrl: Optional[HostlessAnyUrl] = Field(
        None, description=ResourceConfig.model_fields["downloadUrl"].description
    )
    mediaType: Literal["application/vnd.sqlite3"] = Field(
        "application/vnd.sqlite3",
        description=ResourceConfig.model_fields["mediaType"].description,
    )

    # SQLite parse strategy-specific config
    sqlquery: str = Field("", description="A SQL query string.")
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configuration options for the local data cache.",
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configuration options for the local data cache.') class-attribute instance-attribute
downloadUrl: Optional[HostlessAnyUrl] = Field(None, description=ResourceConfig.model_fields['downloadUrl'].description) class-attribute instance-attribute
mediaType: Literal['application/vnd.sqlite3'] = Field('application/vnd.sqlite3', description=ResourceConfig.model_fields['mediaType'].description) class-attribute instance-attribute
sqlquery: str = Field('', description='A SQL query string.') class-attribute instance-attribute

SqliteParseStrategy

Parse strategy for SQLite.

Purpose of this strategy: Download a SQLite database using downloadUrl and run a SQL query on the database to return all relevant rows.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@dataclass
class SqliteParseStrategy:
    """Parse strategy for SQLite.

    Purpose of this strategy: Download a SQLite database using `downloadUrl` and run a
    SQL query on the database to return all relevant rows.

    """

    parse_config: SqliteParserConfig

    def initialize(self) -> AttrDict:
        """Initialize strategy."""
        return AttrDict()

    def get(self) -> SqLiteParseContent:
        """Parse SQLite query responses."""

        if self.parse_config.configuration.downloadUrl is None:
            raise ValueError("No download URL provided.")

        if self.parse_config.configuration.mediaType != "application/vnd.sqlite3":
            raise ValueError("Invalid media type.")

        # Retrieve SQLite file
        downloader = create_strategy(
            "download", self.parse_config.configuration.model_dump()
        )
        cache_key = downloader.get()["key"]

        cache = DataCache(self.parse_config.configuration.datacache_config)
        with cache.getfile(cache_key, suffix="db") as filename:
            connection = create_connection(filename)
            cursor = connection.cursor()
            result = cursor.execute(self.parse_config.configuration.sqlquery).fetchall()
            connection.close()
        return SqLiteParseContent(result=result)
parse_config: SqliteParserConfig instance-attribute
get()

Parse SQLite query responses.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get(self) -> SqLiteParseContent:
    """Parse SQLite query responses."""

    if self.parse_config.configuration.downloadUrl is None:
        raise ValueError("No download URL provided.")

    if self.parse_config.configuration.mediaType != "application/vnd.sqlite3":
        raise ValueError("Invalid media type.")

    # Retrieve SQLite file
    downloader = create_strategy(
        "download", self.parse_config.configuration.model_dump()
    )
    cache_key = downloader.get()["key"]

    cache = DataCache(self.parse_config.configuration.datacache_config)
    with cache.getfile(cache_key, suffix="db") as filename:
        connection = create_connection(filename)
        cursor = connection.cursor()
        result = cursor.execute(self.parse_config.configuration.sqlquery).fetchall()
        connection.close()
    return SqLiteParseContent(result=result)
initialize()

Initialize strategy.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
 99
100
101
def initialize(self) -> AttrDict:
    """Initialize strategy."""
    return AttrDict()

SqliteParserConfig

Bases: ParserConfig

SQLite parse strategy resource config.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
51
52
53
54
55
56
57
58
59
60
class SqliteParserConfig(ParserConfig):
    """SQLite parse strategy resource config."""

    parserType: Literal["parser/sqlite3"] = Field(
        "parser/sqlite3",
        description=ParserConfig.model_fields["parserType"].description,
    )
    configuration: SqliteConfig = Field(
        ..., description="SQLite parse strategy-specific configuration."
    )
configuration: SqliteConfig = Field(..., description='SQLite parse strategy-specific configuration.') class-attribute instance-attribute
parserType: Literal['parser/sqlite3'] = Field('parser/sqlite3', description=ParserConfig.model_fields['parserType'].description) class-attribute instance-attribute

create_connection(db_file)

Create a database connection to SQLite database.

Parameters:

Name Type Description Default
db_file Path

Full path to SQLite database file.

required

Raises:

Type Description
Error

If a DB connection cannot be made.

Returns:

Type Description
Connection

Connection object.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_connection(db_file: Path) -> sqlite3.Connection:
    """Create a database connection to SQLite database.

    Parameters:
        db_file: Full path to SQLite database file.

    Raises:
        sqlite3.Error: If a DB connection cannot be made.

    Returns:
        Connection object.

    """
    try:
        return sqlite3.connect(db_file)
    except sqlite3.Error as exc:
        raise sqlite3.Error("Could not connect to given SQLite DB.") from exc

excel_xlsx

Strategy class for workbook/xlsx.

XLSXParseConfig

Bases: AttrDict

Data model for retrieving a rectangular section of an Excel sheet.

Source code in oteapi/strategies/parse/excel_xlsx.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class XLSXParseConfig(AttrDict):
    """Data model for retrieving a rectangular section of an Excel sheet."""

    # Resource config
    downloadUrl: Optional[HostlessAnyUrl] = Field(
        None, description=ResourceConfig.model_fields["downloadUrl"].description
    )
    mediaType: Literal[
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    ] = Field(
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        description=ResourceConfig.model_fields["mediaType"].description,
    )

    # XLSX parse strategy-specific config
    worksheet: str = Field(..., description="Name of worksheet to load.")
    row_from: Optional[int] = Field(
        None,
        description="Excel row number of first row. Defaults to first assigned row.",
    )
    col_from: Optional[Union[int, str]] = Field(
        None,
        description=(
            "Excel column number or label of first column. Defaults to first assigned "
            "column."
        ),
    )
    row_to: Optional[int] = Field(
        None, description="Excel row number of last row. Defaults to last assigned row."
    )
    col_to: Optional[Union[int, str]] = Field(
        None,
        description=(
            "Excel column number or label of last column. Defaults to last assigned "
            "column."
        ),
    )
    header_row: Optional[int] = Field(
        None,
        description=(
            "Row number with the headers. Defaults to `1` if header is given, "
            "otherwise `None`."
        ),
    )
    header: Optional[list[str]] = Field(
        None,
        description=(
            "Optional list of column names, specifying the columns to return. "
            "These names they should match cells in `header_row`."
        ),
    )
    new_header: Optional[list[str]] = Field(
        None,
        description=(
            "Optional list of new column names replacing `header` in the output."
        ),
    )
    download_config: AttrDict = Field(
        AttrDict(),
        description="Configurations provided to a download strategy.",
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description=(
            "Configurations for the data cache for retrieving the downloaded file "
            "content."
        ),
    )
col_from: Optional[Union[int, str]] = Field(None, description='Excel column number or label of first column. Defaults to first assigned column.') class-attribute instance-attribute
col_to: Optional[Union[int, str]] = Field(None, description='Excel column number or label of last column. Defaults to last assigned column.') class-attribute instance-attribute
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for retrieving the downloaded file content.') class-attribute instance-attribute
downloadUrl: Optional[HostlessAnyUrl] = Field(None, description=ResourceConfig.model_fields['downloadUrl'].description) class-attribute instance-attribute
download_config: AttrDict = Field(AttrDict(), description='Configurations provided to a download strategy.') class-attribute instance-attribute
header: Optional[list[str]] = Field(None, description='Optional list of column names, specifying the columns to return. These names they should match cells in `header_row`.') class-attribute instance-attribute
header_row: Optional[int] = Field(None, description='Row number with the headers. Defaults to `1` if header is given, otherwise `None`.') class-attribute instance-attribute
mediaType: Literal['application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'] = Field('application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', description=ResourceConfig.model_fields['mediaType'].description) class-attribute instance-attribute
new_header: Optional[list[str]] = Field(None, description='Optional list of new column names replacing `header` in the output.') class-attribute instance-attribute
row_from: Optional[int] = Field(None, description='Excel row number of first row. Defaults to first assigned row.') class-attribute instance-attribute
row_to: Optional[int] = Field(None, description='Excel row number of last row. Defaults to last assigned row.') class-attribute instance-attribute
worksheet: str = Field(..., description='Name of worksheet to load.') class-attribute instance-attribute

XLSXParseContent

Bases: AttrDict

Class for returning values from XLSXParse.

Source code in oteapi/strategies/parse/excel_xlsx.py
30
31
32
33
34
35
36
class XLSXParseContent(AttrDict):
    """Class for returning values from XLSXParse."""

    data: dict[str, list] = Field(
        ...,
        description="A dict with column-name/column-value pairs. The values are lists.",
    )
data: dict[str, list] = Field(..., description='A dict with column-name/column-value pairs. The values are lists.') class-attribute instance-attribute

XLSXParseParserConfig

Bases: ParserConfig

XLSX parse strategy resource config.

Source code in oteapi/strategies/parse/excel_xlsx.py
109
110
111
112
113
114
115
116
117
118
class XLSXParseParserConfig(ParserConfig):
    """XLSX parse strategy resource config."""

    parserType: Literal["parser/excel_xlsx"] = Field(
        "parser/excel_xlsx",
        description=ParserConfig.model_fields["parserType"].description,
    )
    configuration: XLSXParseConfig = Field(
        ..., description="SQLite parse strategy-specific configuration."
    )
configuration: XLSXParseConfig = Field(..., description='SQLite parse strategy-specific configuration.') class-attribute instance-attribute
parserType: Literal['parser/excel_xlsx'] = Field('parser/excel_xlsx', description=ParserConfig.model_fields['parserType'].description) class-attribute instance-attribute

XLSXParseStrategy

Parse strategy for Excel XLSX files.

Registers strategies:

  • ("parserType", "excel_xlsx")
Source code in oteapi/strategies/parse/excel_xlsx.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@dataclass
class XLSXParseStrategy:
    """Parse strategy for Excel XLSX files.

    **Registers strategies**:

    - `("parserType", "excel_xlsx")`

    """

    parse_config: XLSXParseParserConfig

    def initialize(self) -> AttrDict:
        """Initialize."""
        return AttrDict()

    def get(self) -> XLSXParseContent:
        """Parses selected region of an excel file.

        Returns:
            A dict with column-name/column-value pairs. The values are lists.

        """

        config = self.parse_config.configuration

        # Download the file
        download_config = config.model_dump()
        download_config["configuration"] = config.download_config.model_dump()
        output = create_strategy("download", download_config).get()

        if config.datacache_config and config.datacache_config.accessKey:
            cache_key = config.datacache_config.accessKey
        elif "key" in output:
            cache_key = output["key"]
        else:
            raise RuntimeError("No data cache key provided to the downloaded content")

        cache = DataCache(config.datacache_config)

        with cache.getfile(key=cache_key, suffix=".xlsx") as filename:
            # Note that we have to set read_only=False to ensure that
            # load_workbook() properly closes the xlsx file after reading.
            # Otherwise Windows will fail when the temporary file is removed
            # when leaving the with statement.
            workbook = load_workbook(filename=filename, read_only=False, data_only=True)

        worksheet = workbook[config.worksheet]
        set_model_defaults(config, worksheet)
        columns = get_column_indices(config, worksheet)

        data = []
        for row in worksheet.iter_rows(
            min_row=config.row_from,
            max_row=config.row_to,
            min_col=min(columns),
            max_col=max(columns),
        ):
            data.append([row[c - 1].value for c in columns])

        if config.header_row:
            row = worksheet.iter_rows(
                min_row=config.header_row,
                max_row=config.header_row,
                min_col=min(columns),
                max_col=max(columns),
            ).__next__()
            header = [row[c - 1].value for c in columns]
        else:
            header = None

        if config.new_header:
            nhead = len(header) if header else len(data[0]) if data else 0
            if len(config.new_header) != nhead:
                raise TypeError(
                    "length of `new_header` "
                    f"(={len(config.new_header)}) "
                    f"doesn't match number of columns (={len(header) if header else 0})"
                )
            if header:
                for i, val in enumerate(config.new_header):
                    if val is not None:
                        header[i] = val
            elif data:
                header = config.new_header

        if header is None:
            header = [get_column_letter(col + 1) for col in range(len(data))]

        transposed = [list(datum) for datum in zip(*data)]
        return XLSXParseContent(data=dict(zip(header, transposed)))
parse_config: XLSXParseParserConfig instance-attribute
get()

Parses selected region of an excel file.

Returns:

Type Description
XLSXParseContent

A dict with column-name/column-value pairs. The values are lists.

Source code in oteapi/strategies/parse/excel_xlsx.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def get(self) -> XLSXParseContent:
    """Parses selected region of an excel file.

    Returns:
        A dict with column-name/column-value pairs. The values are lists.

    """

    config = self.parse_config.configuration

    # Download the file
    download_config = config.model_dump()
    download_config["configuration"] = config.download_config.model_dump()
    output = create_strategy("download", download_config).get()

    if config.datacache_config and config.datacache_config.accessKey:
        cache_key = config.datacache_config.accessKey
    elif "key" in output:
        cache_key = output["key"]
    else:
        raise RuntimeError("No data cache key provided to the downloaded content")

    cache = DataCache(config.datacache_config)

    with cache.getfile(key=cache_key, suffix=".xlsx") as filename:
        # Note that we have to set read_only=False to ensure that
        # load_workbook() properly closes the xlsx file after reading.
        # Otherwise Windows will fail when the temporary file is removed
        # when leaving the with statement.
        workbook = load_workbook(filename=filename, read_only=False, data_only=True)

    worksheet = workbook[config.worksheet]
    set_model_defaults(config, worksheet)
    columns = get_column_indices(config, worksheet)

    data = []
    for row in worksheet.iter_rows(
        min_row=config.row_from,
        max_row=config.row_to,
        min_col=min(columns),
        max_col=max(columns),
    ):
        data.append([row[c - 1].value for c in columns])

    if config.header_row:
        row = worksheet.iter_rows(
            min_row=config.header_row,
            max_row=config.header_row,
            min_col=min(columns),
            max_col=max(columns),
        ).__next__()
        header = [row[c - 1].value for c in columns]
    else:
        header = None

    if config.new_header:
        nhead = len(header) if header else len(data[0]) if data else 0
        if len(config.new_header) != nhead:
            raise TypeError(
                "length of `new_header` "
                f"(={len(config.new_header)}) "
                f"doesn't match number of columns (={len(header) if header else 0})"
            )
        if header:
            for i, val in enumerate(config.new_header):
                if val is not None:
                    header[i] = val
        elif data:
            header = config.new_header

    if header is None:
        header = [get_column_letter(col + 1) for col in range(len(data))]

    transposed = [list(datum) for datum in zip(*data)]
    return XLSXParseContent(data=dict(zip(header, transposed)))
initialize()

Initialize.

Source code in oteapi/strategies/parse/excel_xlsx.py
188
189
190
def initialize(self) -> AttrDict:
    """Initialize."""
    return AttrDict()

get_column_indices(model, worksheet)

Helper function returning a list of column indices.

Parameters:

Name Type Description Default
model XLSXParseConfig

The parsed data model.

required
worksheet Worksheet

Excel worksheet, from which the header values will be retrieved.

required

Returns:

Type Description
Iterable[int]

A list of column indices.

Source code in oteapi/strategies/parse/excel_xlsx.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def get_column_indices(model: XLSXParseConfig, worksheet: Worksheet) -> Iterable[int]:
    """Helper function returning a list of column indices.

    Parameters:
        model: The parsed data model.
        worksheet: Excel worksheet, from which the header values will be retrieved.

    Returns:
        A list of column indices.

    """
    if not isinstance(model.col_from, int) or not isinstance(model.col_to, int):
        raise TypeError("Expected `model.col_from` and `model.col_to` to be integers.")

    if model.header:
        header_dict = {
            worksheet.cell(model.header_row, col).value: col
            for col in range(model.col_from, model.col_to + 1)
        }
        return [header_dict[h] for h in model.header]
    return range(model.col_from, model.col_to + 1)

set_model_defaults(model, worksheet)

Update data model model with default values obtained from worksheet.

Parameters:

Name Type Description Default
model XLSXParseConfig

The parsed data model.

required
worksheet Worksheet

Excel worksheet, from which the default values will be obtained.

required
Source code in oteapi/strategies/parse/excel_xlsx.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def set_model_defaults(model: XLSXParseConfig, worksheet: Worksheet) -> None:
    """Update data model `model` with default values obtained from `worksheet`.

    Parameters:
        model: The parsed data model.
        worksheet: Excel worksheet, from which the default values will be obtained.

    """
    if model.row_from is None:
        if model.header:
            # assume that data starts on the first row after the header
            model.row_from = model.header_row + 1 if model.header_row else 1
        else:
            model.row_from = worksheet.min_row

    if model.row_to is None:
        model.row_to = worksheet.max_row

    if model.col_from is None:
        model.col_from = worksheet.min_column
    elif isinstance(model.col_from, str):
        model.col_from = column_index_from_string(model.col_from)

    if model.col_to is None:
        model.col_to = worksheet.max_column
    elif isinstance(model.col_to, str):
        model.col_to = column_index_from_string(model.col_to)

    if model.header and not model.header_row:
        model.header_row = 1

image

Strategy class for image/jpg.

ImageConfig

Bases: AttrDict

Configuration data model for ImageDataParseStrategy.

Source code in oteapi/strategies/parse/image.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class ImageConfig(AttrDict):
    """Configuration data model for
    [`ImageDataParseStrategy`][oteapi.strategies.parse.image.ImageDataParseStrategy]."""

    # Resource config
    downloadUrl: Optional[HostlessAnyUrl] = Field(
        None, description=ResourceConfig.model_fields["downloadUrl"].description
    )
    mediaType: Optional[
        Literal[
            "image/jpg",
            "image/jpeg",
            "image/jp2",
            "image/png",
            "image/gif",
            "image/tiff",
            "image/eps",
        ]
    ] = Field(
        None,
        description=ResourceConfig.model_fields["mediaType"].description,
    )

    # Image parse strategy-specific config
    crop: Optional[tuple[int, int, int, int]] = Field(
        None,
        description="Box cropping parameters (left, top, right, bottom).",
        # Effectively mapping 'imagecrop' to 'crop'.
        # 'imagecrop' is used by the crop filter strategy.
        validation_alias=AliasChoices("crop", "imagecrop"),
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configuration options for the local data cache.",
    )
    image_key: Optional[str] = Field(
        None,
        description="Key to use when storing the image data in datacache.",
    )
    image_mode: Optional[str] = Field(
        None,
        description=(
            "Pillow mode to convert image into. See "
            "https://pillow.readthedocs.io/en/stable/handbook/concepts.html "
            "for details."
        ),
    )
crop: Optional[tuple[int, int, int, int]] = Field(None, description='Box cropping parameters (left, top, right, bottom).', validation_alias=AliasChoices('crop', 'imagecrop')) class-attribute instance-attribute
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configuration options for the local data cache.') class-attribute instance-attribute
downloadUrl: Optional[HostlessAnyUrl] = Field(None, description=ResourceConfig.model_fields['downloadUrl'].description) class-attribute instance-attribute
image_key: Optional[str] = Field(None, description='Key to use when storing the image data in datacache.') class-attribute instance-attribute
image_mode: Optional[str] = Field(None, description='Pillow mode to convert image into. See https://pillow.readthedocs.io/en/stable/handbook/concepts.html for details.') class-attribute instance-attribute
mediaType: Optional[Literal['image/jpg', 'image/jpeg', 'image/jp2', 'image/png', 'image/gif', 'image/tiff', 'image/eps']] = Field(None, description=ResourceConfig.model_fields['mediaType'].description) class-attribute instance-attribute

ImageDataParseStrategy

Parse strategy for images.

This strategy uses Pillow to read a raw image from the data cache, converts it into a NumPy array and stores the new array in the data cache.

It also supports simple cropping and image conversions.

The key to the new array and other metadata is returned. See ImageParseContent for more info.

Source code in oteapi/strategies/parse/image.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@dataclass
class ImageDataParseStrategy:
    """Parse strategy for images.

    This strategy uses Pillow to read a raw image from the data cache,
    converts it into a NumPy array and stores the new array in the
    data cache.

    It also supports simple cropping and image conversions.

    The key to the new array and other metadata is returned. See
    [`ImageParseContent`][oteapi.strategies.parse.image.ImageParseContent]
    for more info.

    """

    parse_config: ImageParserConfig

    def initialize(self) -> AttrDict:
        """Initialize strategy."""
        return AttrDict()

    def get(self) -> ImageParseContent:
        """Execute the strategy."""

        config = self.parse_config.configuration

        if config.mediaType is None:
            raise ValueError("No media type provided to the image parser")

        mime_format = config.mediaType.split("/")[1]
        image_format = SupportedFormat[mime_format].value

        # Download the image
        download_config = config.model_dump()
        download_config["configuration"] = config.model_dump()
        output = create_strategy("download", download_config).get()

        if config.datacache_config and config.datacache_config.accessKey:
            cache_key = config.datacache_config.accessKey
        elif "key" in output:
            cache_key = output["key"]
        else:
            raise RuntimeError("No data cache key provided to the downloaded content")

        cache = DataCache(config.datacache_config)

        # Treat image according to filter values
        with (
            cache.getfile(cache_key, suffix=mime_format) as filename,
            Image.open(filename, formats=[image_format]) as image,
        ):
            final_image: Image.Image | None = None

            if config.crop:
                final_image = image.crop(config.crop)

            if config.image_mode:
                final_image = (
                    image.convert(mode=config.image_mode)
                    if final_image is None
                    else final_image.convert(mode=config.image_mode)
                )

            if final_image is None:
                final_image = image

            if image_format == "GIF" and final_image.info.get(
                "version", b""
            ).startswith(b"GIF"):
                final_image.info.update(
                    {"version": final_image.info.get("version", b"")[len(b"GIF") :]}
                )

            image_key = cache.add(
                final_image.tobytes(),
                key=config.image_key,
            )

            if final_image.mode == "P":
                image_palette_key = cache.add(final_image.getpalette())
            else:
                image_palette_key = None

            # The returned content must be json serialisable - filter out all
            # non-json serialisable fields in final_image.info
            if final_image.info:
                image_info = {
                    key: val
                    for key, val in final_image.info.items()
                    if isinstance(val, (str, int, float, type(None), bool, tuple, list))
                }
            else:
                image_info = {}

            return ImageParseContent(
                image_key=image_key,
                image_size=final_image.size,
                image_mode=final_image.mode,
                image_palette_key=image_palette_key,
                image_info=image_info,
            )
parse_config: ImageParserConfig instance-attribute
get()

Execute the strategy.

Source code in oteapi/strategies/parse/image.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def get(self) -> ImageParseContent:
    """Execute the strategy."""

    config = self.parse_config.configuration

    if config.mediaType is None:
        raise ValueError("No media type provided to the image parser")

    mime_format = config.mediaType.split("/")[1]
    image_format = SupportedFormat[mime_format].value

    # Download the image
    download_config = config.model_dump()
    download_config["configuration"] = config.model_dump()
    output = create_strategy("download", download_config).get()

    if config.datacache_config and config.datacache_config.accessKey:
        cache_key = config.datacache_config.accessKey
    elif "key" in output:
        cache_key = output["key"]
    else:
        raise RuntimeError("No data cache key provided to the downloaded content")

    cache = DataCache(config.datacache_config)

    # Treat image according to filter values
    with (
        cache.getfile(cache_key, suffix=mime_format) as filename,
        Image.open(filename, formats=[image_format]) as image,
    ):
        final_image: Image.Image | None = None

        if config.crop:
            final_image = image.crop(config.crop)

        if config.image_mode:
            final_image = (
                image.convert(mode=config.image_mode)
                if final_image is None
                else final_image.convert(mode=config.image_mode)
            )

        if final_image is None:
            final_image = image

        if image_format == "GIF" and final_image.info.get(
            "version", b""
        ).startswith(b"GIF"):
            final_image.info.update(
                {"version": final_image.info.get("version", b"")[len(b"GIF") :]}
            )

        image_key = cache.add(
            final_image.tobytes(),
            key=config.image_key,
        )

        if final_image.mode == "P":
            image_palette_key = cache.add(final_image.getpalette())
        else:
            image_palette_key = None

        # The returned content must be json serialisable - filter out all
        # non-json serialisable fields in final_image.info
        if final_image.info:
            image_info = {
                key: val
                for key, val in final_image.info.items()
                if isinstance(val, (str, int, float, type(None), bool, tuple, list))
            }
        else:
            image_info = {}

        return ImageParseContent(
            image_key=image_key,
            image_size=final_image.size,
            image_mode=final_image.mode,
            image_palette_key=image_palette_key,
            image_info=image_info,
        )
initialize()

Initialize strategy.

Source code in oteapi/strategies/parse/image.py
151
152
153
def initialize(self) -> AttrDict:
    """Initialize strategy."""
    return AttrDict()

ImageParseContent

Bases: AttrDict

Configuration model for the returned content from the Image parser.

See Pillow handbook for more details on image_mode, image_palette, and image_info.

Source code in oteapi/strategies/parse/image.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class ImageParseContent(AttrDict):
    """Configuration model for the returned content from the Image parser.

    See
    [Pillow handbook](https://pillow.readthedocs.io/en/stable/handbook/concepts.html)
    for more details on `image_mode`, `image_palette`, and `image_info`.
    """

    image_key: str = Field(
        ...,
        description="Key with which the image content is stored in the data cache.",
    )
    image_size: tuple[int, int] = Field(
        ...,
        description="Image size (width, height).",
    )
    image_mode: str = Field(
        ...,
        description="Image mode. Examples: 'L', 'P', 'RGB', 'RGBA'...",
    )
    image_palette_key: Optional[str] = Field(
        None,
        description="Datacache key for colour palette if mode is 'P'.",
    )
    image_info: dict = Field(
        {},
        description="Additional information about the image.",
    )
image_info: dict = Field({}, description='Additional information about the image.') class-attribute instance-attribute
image_key: str = Field(..., description='Key with which the image content is stored in the data cache.') class-attribute instance-attribute
image_mode: str = Field(..., description="Image mode. Examples: 'L', 'P', 'RGB', 'RGBA'...") class-attribute instance-attribute
image_palette_key: Optional[str] = Field(None, description="Datacache key for colour palette if mode is 'P'.") class-attribute instance-attribute
image_size: tuple[int, int] = Field(..., description='Image size (width, height).') class-attribute instance-attribute

ImageParserConfig

Bases: ParserConfig

Image parse strategy resource config.

Source code in oteapi/strategies/parse/image.py
78
79
80
81
82
83
84
85
86
87
88
class ImageParserConfig(ParserConfig):
    """Image parse strategy resource config."""

    parserType: Literal["parser/image"] = Field(
        "parser/image",
        description=ParserConfig.model_fields["parserType"].description,
    )
    configuration: ImageConfig = Field(
        ...,
        description="Image parse strategy-specific configuration.",
    )
configuration: ImageConfig = Field(..., description='Image parse strategy-specific configuration.') class-attribute instance-attribute
parserType: Literal['parser/image'] = Field('parser/image', description=ParserConfig.model_fields['parserType'].description) class-attribute instance-attribute

SupportedFormat

Bases: Enum

Supported formats for ImageDataParseStrategy.

Source code in oteapi/strategies/parse/image.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class SupportedFormat(Enum):
    """Supported formats for `ImageDataParseStrategy`."""

    jpeg = "JPEG"
    jpg = "JPEG"  # noqa: PIE796
    jp2 = "JPEG2000"
    png = "PNG"
    gif = "GIF"
    tiff = "TIFF"
    eps = "EPS"
eps = 'EPS' class-attribute instance-attribute
gif = 'GIF' class-attribute instance-attribute
jp2 = 'JPEG2000' class-attribute instance-attribute
jpeg = 'JPEG' class-attribute instance-attribute
jpg = 'JPEG' class-attribute instance-attribute
png = 'PNG' class-attribute instance-attribute
tiff = 'TIFF' class-attribute instance-attribute

postgres

Strategy class for application/vnd.postgresql

PostgresConfig

Bases: AttrDict

Configuration data model for PostgresParserStrategy.

Source code in oteapi/strategies/parse/postgres.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class PostgresConfig(AttrDict):
    """Configuration data model for
    [`PostgresParserStrategy`][oteapi.strategies.parse.postgres.PostgresParserConfig].
    """

    # Resource config
    accessService: Literal["postgres"] = Field(
        "postgres",
        description=ResourceConfig.model_fields["accessService"].description,
    )
    accessUrl: Optional[HostlessAnyUrl] = Field(
        None,
        description=ResourceConfig.model_fields["accessUrl"].description,
    )

    # Postgres specific config
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configuration options for the local data cache.",
    )
    user: Optional[str] = Field(None, description="postgres server username")
    dbname: Optional[str] = Field(None, description="postgres dbname name")
    password: Optional[str] = Field(None, description="postgres password")
    sqlquery: str = Field("", description="A SQL query string.")

    @model_validator(mode="before")
    @classmethod
    def adjust_url(cls, data: Any) -> dict[str, Any]:
        """Model Validator
        Verifies configuration consistency, merge configurations
        and update the accessUrl property.
        """
        if isinstance(data, BaseModel):
            data = data.model_dump()
        elif not isinstance(data, dict):
            raise TypeError(
                "invalid data type, should be either dict or pydantic model"
            )

        if "accessUrl" not in data:
            return data

        # Copy model-state into placeholders
        accessUrl = AnyUrl(data["accessUrl"])
        default_config = PostgresConfig()
        current_config: dict[str, Any] = data.get("configuration", {})

        if not accessUrl.host:
            raise ValueError("missing host in accessUrl")

        def _get_and_validate_config_value(url_parameter: str, config_key: str) -> str:
            """Get value from accessUrl or current_config, and check for mismatches."""
            value_from_url = getattr(accessUrl, url_parameter, None)
            value_from_config = current_config.get(
                config_key, getattr(default_config, config_key)
            )

            final_value = value_from_url or value_from_config

            if value_from_config and final_value != value_from_config:
                raise ValueError(
                    f"mismatching {url_parameter} in accessUrl and {config_key} in "
                    "configuration"
                )

            return final_value

        user = _get_and_validate_config_value("username", "user")
        password = _get_and_validate_config_value("password", "password")
        dbname = _get_and_validate_config_value("path", "dbname")

        # Reconstruct accessUrl from the updated properties
        data["accessUrl"] = accessUrl.__class__.build(
            scheme=accessUrl.scheme,
            username=user,
            password=password,
            host=accessUrl.host,
            port=accessUrl.port,
            path=dbname,
            query=accessUrl.query,
            fragment=accessUrl.fragment,
        )
        return data
accessService: Literal['postgres'] = Field('postgres', description=ResourceConfig.model_fields['accessService'].description) class-attribute instance-attribute
accessUrl: Optional[HostlessAnyUrl] = Field(None, description=ResourceConfig.model_fields['accessUrl'].description) class-attribute instance-attribute
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configuration options for the local data cache.') class-attribute instance-attribute
dbname: Optional[str] = Field(None, description='postgres dbname name') class-attribute instance-attribute
password: Optional[str] = Field(None, description='postgres password') class-attribute instance-attribute
sqlquery: str = Field('', description='A SQL query string.') class-attribute instance-attribute
user: Optional[str] = Field(None, description='postgres server username') class-attribute instance-attribute
adjust_url(data) classmethod

Model Validator Verifies configuration consistency, merge configurations and update the accessUrl property.

Source code in oteapi/strategies/parse/postgres.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@model_validator(mode="before")
@classmethod
def adjust_url(cls, data: Any) -> dict[str, Any]:
    """Model Validator
    Verifies configuration consistency, merge configurations
    and update the accessUrl property.
    """
    if isinstance(data, BaseModel):
        data = data.model_dump()
    elif not isinstance(data, dict):
        raise TypeError(
            "invalid data type, should be either dict or pydantic model"
        )

    if "accessUrl" not in data:
        return data

    # Copy model-state into placeholders
    accessUrl = AnyUrl(data["accessUrl"])
    default_config = PostgresConfig()
    current_config: dict[str, Any] = data.get("configuration", {})

    if not accessUrl.host:
        raise ValueError("missing host in accessUrl")

    def _get_and_validate_config_value(url_parameter: str, config_key: str) -> str:
        """Get value from accessUrl or current_config, and check for mismatches."""
        value_from_url = getattr(accessUrl, url_parameter, None)
        value_from_config = current_config.get(
            config_key, getattr(default_config, config_key)
        )

        final_value = value_from_url or value_from_config

        if value_from_config and final_value != value_from_config:
            raise ValueError(
                f"mismatching {url_parameter} in accessUrl and {config_key} in "
                "configuration"
            )

        return final_value

    user = _get_and_validate_config_value("username", "user")
    password = _get_and_validate_config_value("password", "password")
    dbname = _get_and_validate_config_value("path", "dbname")

    # Reconstruct accessUrl from the updated properties
    data["accessUrl"] = accessUrl.__class__.build(
        scheme=accessUrl.scheme,
        username=user,
        password=password,
        host=accessUrl.host,
        port=accessUrl.port,
        path=dbname,
        query=accessUrl.query,
        fragment=accessUrl.fragment,
    )
    return data

PostgresParserConfig

Bases: ParserConfig

Postgresql parse strategy config

Source code in oteapi/strategies/parse/postgres.py
111
112
113
114
115
116
117
118
119
120
121
122
123
class PostgresParserConfig(ParserConfig):
    """Postgresql parse strategy config"""

    parserType: Literal["parser/postgres"] = Field(
        "parser/postgres",
        description="Type of registered resource strategy.",
    )
    configuration: PostgresConfig = Field(
        ...,
        description=(
            "Configuration for resource. Values in the accessURL take precedence."
        ),
    )
configuration: PostgresConfig = Field(..., description='Configuration for resource. Values in the accessURL take precedence.') class-attribute instance-attribute
parserType: Literal['parser/postgres'] = Field('parser/postgres', description='Type of registered resource strategy.') class-attribute instance-attribute

PostgresParserContent

Bases: AttrDict

Configuration model for PostgresParser.

Source code in oteapi/strategies/parse/postgres.py
145
146
147
148
class PostgresParserContent(AttrDict):
    """Configuration model for PostgresParser."""

    result: list = Field(..., description="List of results from the query.")
result: list = Field(..., description='List of results from the query.') class-attribute instance-attribute

PostgresParserStrategy

Resource strategy for Postgres.

Purpose of this strategy: Connect to a postgres DB and run a SQL query on the dbname to return all relevant rows.

Source code in oteapi/strategies/parse/postgres.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@dataclass
class PostgresParserStrategy:
    """Resource strategy for Postgres.

    Purpose of this strategy: Connect to a postgres DB and run a
    SQL query on the dbname to return all relevant rows.

    """

    parser_config: PostgresParserConfig

    def initialize(self) -> AttrDict:
        """Initialize strategy."""
        return AttrDict()

    def get(self) -> PostgresParserContent:
        """Resource Postgres query responses."""

        if self.parser_config.configuration.accessUrl is None:
            raise ValueError("accessUrl is required for PostgresParserStrategy")

        connection = create_connection(str(self.parser_config.configuration.accessUrl))
        cursor = connection.cursor()
        result = cursor.execute(self.parser_config.configuration.sqlquery).fetchall()
        connection.close()
        return PostgresParserContent(result=result)
parser_config: PostgresParserConfig instance-attribute
get()

Resource Postgres query responses.

Source code in oteapi/strategies/parse/postgres.py
166
167
168
169
170
171
172
173
174
175
176
def get(self) -> PostgresParserContent:
    """Resource Postgres query responses."""

    if self.parser_config.configuration.accessUrl is None:
        raise ValueError("accessUrl is required for PostgresParserStrategy")

    connection = create_connection(str(self.parser_config.configuration.accessUrl))
    cursor = connection.cursor()
    result = cursor.execute(self.parser_config.configuration.sqlquery).fetchall()
    connection.close()
    return PostgresParserContent(result=result)
initialize()

Initialize strategy.

Source code in oteapi/strategies/parse/postgres.py
162
163
164
def initialize(self) -> AttrDict:
    """Initialize strategy."""
    return AttrDict()

create_connection(url)

Create a dbname connection to Postgres dbname.

Parameters:

Name Type Description Default
url str

A valid PostgreSQL URL.

required

Raises:

Type Description
Error

If a DB connection cannot be made.

Returns:

Type Description
Connection

Connection object.

Source code in oteapi/strategies/parse/postgres.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def create_connection(url: str) -> psycopg.Connection:
    """Create a dbname connection to Postgres dbname.

    Parameters:
        url: A valid PostgreSQL URL.

    Raises:
        psycopg.Error: If a DB connection cannot be made.

    Returns:
        Connection object.

    """
    try:
        return psycopg.connect(url)
    except psycopg.Error as exc:
        raise psycopg.Error("Could not connect to given Postgres DB.") from exc

text_csv

Strategy class for parser/csv.

CSVDialect: type[Enum] = Enum(value='CSVDialect', names={dialect.upper(): dialectfor dialect in csv.list_dialects()}, module=__name__, type=str) module-attribute

CSV dialects.

All available dialects are retrieved through the csv.list_dialects() function, and will thus depend on the currently loaded and used Python interpreter.

CSVConfig

Bases: AttrDict

CSV parse-specific Configuration Data Model.

Source code in oteapi/strategies/parse/text_csv.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class CSVConfig(AttrDict):
    """CSV parse-specific Configuration Data Model."""

    # Resource config
    downloadUrl: Optional[HostlessAnyUrl] = Field(
        None,
        description=ResourceConfig.model_fields["downloadUrl"].description,
    )
    mediaType: Literal["text/csv"] = Field(
        "text/csv",
        description=ResourceConfig.model_fields["mediaType"].description,
    )

    # CSV parse strategy-specific configuration
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description=(
            "Configurations for the data cache for storing the downloaded file "
            "content."
        ),
    )
    dialect: DialectFormatting = Field(
        DialectFormatting(),
        description=(
            "Dialect and formatting parameters. See [the Python docs]"
            "(https://docs.python.org/3/library/csv.html#csv-fmt-params) for more "
            "information."
        ),
    )
    reader: ReaderConfig = Field(
        ReaderConfig(),
        description=(
            "CSV DictReader configuration parameters. See [the Python docs]"
            "(https://docs.python.org/3/library/csv.html#csv.DictReader) for more "
            "information."
        ),
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute
dialect: DialectFormatting = Field(DialectFormatting(), description='Dialect and formatting parameters. See [the Python docs](https://docs.python.org/3/library/csv.html#csv-fmt-params) for more information.') class-attribute instance-attribute
downloadUrl: Optional[HostlessAnyUrl] = Field(None, description=ResourceConfig.model_fields['downloadUrl'].description) class-attribute instance-attribute
mediaType: Literal['text/csv'] = Field('text/csv', description=ResourceConfig.model_fields['mediaType'].description) class-attribute instance-attribute
reader: ReaderConfig = Field(ReaderConfig(), description='CSV DictReader configuration parameters. See [the Python docs](https://docs.python.org/3/library/csv.html#csv.DictReader) for more information.') class-attribute instance-attribute

CSVParseContent

Bases: AttrDict

Class for returning values from CSV Parse.

Source code in oteapi/strategies/parse/text_csv.py
273
274
275
276
277
278
class CSVParseContent(AttrDict):
    """Class for returning values from CSV Parse."""

    content: dict[Union[str, None], list[Any]] = Field(
        ..., description="Content of the CSV document."
    )
content: dict[Union[str, None], list[Any]] = Field(..., description='Content of the CSV document.') class-attribute instance-attribute

CSVParseStrategy

Parse strategy for CSV files.

Source code in oteapi/strategies/parse/text_csv.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
@dataclass
class CSVParseStrategy:
    """Parse strategy for CSV files."""

    parse_config: CSVParserConfig

    def initialize(self) -> AttrDict:
        """Initialize."""
        return AttrDict()

    def get(self) -> CSVParseContent:
        """Parse CSV."""
        config = self.parse_config.configuration

        # Download the file
        download_config = config.model_dump()
        download_config["configuration"] = config.model_dump()
        output = create_strategy("download", download_config).get()

        if config.datacache_config and config.datacache_config.accessKey:
            cache_key = config.datacache_config.accessKey
        elif "key" in output:
            cache_key = output["key"]
        else:
            raise RuntimeError("No data cache key provided to the downloaded content")

        cache = DataCache(config.datacache_config)

        with cache.getfile(cache_key) as csvfile_path:
            kwargs = config.dialect.model_dump(
                exclude={"base", "quoting"}, exclude_unset=True
            )

            dialect = config.dialect.base
            if dialect:
                kwargs["dialect"] = dialect.value
            quoting = config.dialect.quoting
            if quoting:
                kwargs["quoting"] = quoting.csv_constant()

            kwargs.update(config.reader.model_dump(exclude_unset=True))

            with csvfile_path.open(
                newline="",
                encoding=config.reader.encoding,
            ) as csvfile:
                csvreader = csv.DictReader(csvfile, **kwargs)
                content: dict[Union[str, None], list[Any]] = defaultdict(list)
                for row in csvreader:
                    for field, value in row.items():
                        if (
                            csvreader.reader.dialect.quoting == csv.QUOTE_NONNUMERIC
                            and isinstance(value, float)
                            and value.is_integer()
                        ):
                            content[field].append(int(value))
                        else:
                            content[field].append(value)

        for key in list(content):
            if any(isinstance(value, float) for value in content[key]):
                content[key] = [
                    (
                        float(value)
                        if (value or value == 0.0 or value == 0)
                        and value != csvreader.restval
                        else float("nan")
                    )
                    for value in content[key]
                ]
                continue
            if any(isinstance(value, int) for value in content[key]):
                content[key] = [
                    (
                        int(value)
                        if (value or value == 0) and value != csvreader.restval
                        else csvreader.restval
                    )
                    for value in content[key]
                ]

        return CSVParseContent(content=content)
parse_config: CSVParserConfig instance-attribute
get()

Parse CSV.

Source code in oteapi/strategies/parse/text_csv.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def get(self) -> CSVParseContent:
    """Parse CSV."""
    config = self.parse_config.configuration

    # Download the file
    download_config = config.model_dump()
    download_config["configuration"] = config.model_dump()
    output = create_strategy("download", download_config).get()

    if config.datacache_config and config.datacache_config.accessKey:
        cache_key = config.datacache_config.accessKey
    elif "key" in output:
        cache_key = output["key"]
    else:
        raise RuntimeError("No data cache key provided to the downloaded content")

    cache = DataCache(config.datacache_config)

    with cache.getfile(cache_key) as csvfile_path:
        kwargs = config.dialect.model_dump(
            exclude={"base", "quoting"}, exclude_unset=True
        )

        dialect = config.dialect.base
        if dialect:
            kwargs["dialect"] = dialect.value
        quoting = config.dialect.quoting
        if quoting:
            kwargs["quoting"] = quoting.csv_constant()

        kwargs.update(config.reader.model_dump(exclude_unset=True))

        with csvfile_path.open(
            newline="",
            encoding=config.reader.encoding,
        ) as csvfile:
            csvreader = csv.DictReader(csvfile, **kwargs)
            content: dict[Union[str, None], list[Any]] = defaultdict(list)
            for row in csvreader:
                for field, value in row.items():
                    if (
                        csvreader.reader.dialect.quoting == csv.QUOTE_NONNUMERIC
                        and isinstance(value, float)
                        and value.is_integer()
                    ):
                        content[field].append(int(value))
                    else:
                        content[field].append(value)

    for key in list(content):
        if any(isinstance(value, float) for value in content[key]):
            content[key] = [
                (
                    float(value)
                    if (value or value == 0.0 or value == 0)
                    and value != csvreader.restval
                    else float("nan")
                )
                for value in content[key]
            ]
            continue
        if any(isinstance(value, int) for value in content[key]):
            content[key] = [
                (
                    int(value)
                    if (value or value == 0) and value != csvreader.restval
                    else csvreader.restval
                )
                for value in content[key]
            ]

    return CSVParseContent(content=content)
initialize()

Initialize.

Source code in oteapi/strategies/parse/text_csv.py
287
288
289
def initialize(self) -> AttrDict:
    """Initialize."""
    return AttrDict()

CSVParserConfig

Bases: ParserConfig

CSV parse strategy filter config.

Source code in oteapi/strategies/parse/text_csv.py
261
262
263
264
265
266
267
268
269
270
class CSVParserConfig(ParserConfig):
    """CSV parse strategy filter config."""

    parserType: Literal["parser/csv"] = Field(
        "parser/csv",
        description=ParserConfig.model_fields["parserType"].description,
    )
    configuration: CSVConfig = Field(
        ..., description="CSV parse strategy-specific configuration."
    )
configuration: CSVConfig = Field(..., description='CSV parse strategy-specific configuration.') class-attribute instance-attribute
parserType: Literal['parser/csv'] = Field('parser/csv', description=ParserConfig.model_fields['parserType'].description) class-attribute instance-attribute

DialectFormatting

Bases: BaseModel

Dialect and formatting parameters for CSV.

See the Python docs for more information.

Note

As Dialect.lineterminator is hardcoded in csv.reader, it is left out of this model.

Source code in oteapi/strategies/parse/text_csv.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class DialectFormatting(BaseModel):
    """Dialect and formatting parameters for CSV.

    See [the Python docs](https://docs.python.org/3/library/csv.html#csv-fmt-params)
    for more information.

    Note:
        As `Dialect.lineterminator` is hardcoded in `csv.reader`, it is left out of
        this model.

    """

    base: Optional[CSVDialect] = Field(
        None,
        description=(
            "A specific CSV dialect, e.g., 'excel'. Any other parameters here will "
            "overwrite the preset dialect parameters for the specified dialect."
        ),
    )
    delimiter: Optional[str] = Field(
        None,
        description=(
            "A one-character string used to separate fields. "
            "See [the Python docs entry](https://docs.python.org/3/library/csv.html"
            "#csv.Dialect.delimiter) for more information."
        ),
        min_length=1,
        max_length=1,
    )
    doublequote: Optional[bool] = Field(
        None,
        description=(
            "Controls how instances of [`quotechar`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.quotechar] "
            "appearing inside a field should themselves be quoted. When `True`, the "
            "character is doubled. When `False`, the [`escapechar`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.escapechar] "
            "is used as a prefix to the [`quotechar`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.quotechar]. "
            "See [the Python docs entry]"
            "(https://docs.python.org/3/library/csv.html#csv.Dialect.doublequote) "
            "for more information."
        ),
    )
    escapechar: Optional[str] = Field(
        None,
        description=(
            "A one-character string used by the writer to escape the [`delimiter`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] if "
            "[`quoting`][oteapi.strategies.parse.text_csv.DialectFormatting.quoting] "
            "is set to [`QUOTE_NONE`]"
            "[oteapi.strategies.parse.text_csv.QuoteConstants.QUOTE_NONE] and the "
            "[`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting."
            "quotechar] if [`doublequote`][oteapi.strategies.parse.text_csv."
            "DialectFormatting.doublequote] is `False`. On reading, the "
            "[`escapechar`][oteapi.strategies.parse.text_csv.DialectFormatting."
            "escapechar] removes any special meaning from the following character. "
            "See [the Python docs entry]"
            "(https://docs.python.org/3/library/csv.html#csv.Dialect.escapechar) "
            "for more information."
        ),
        min_length=1,
        max_length=1,
    )
    quotechar: Optional[str] = Field(
        None,
        description=(
            "A one-character string used to quote fields containing special "
            "characters, such as the [`delimiter`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] or "
            "[`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting."
            "quotechar], or which contain new-line characters. See "
            "[the Python docs entry](https://docs.python.org/3/library/csv.html"
            "#csv.Dialect.quotechar) for more information."
        ),
        min_length=1,
        max_length=1,
    )
    quoting: Optional[QuoteConstants] = Field(
        None,
        description=(
            "Controls when quotes should be generated by the writer and recognised by "
            "the reader. It can take on any of the `QUOTE_*` constants (see section "
            "[Module Contents](https://docs.python.org/3/library/csv.html"
            "#csv-contents)). See [the Python docs entry]"
            "(https://docs.python.org/3/library/csv.html#csv.Dialect.quoting) "
            "for more information."
        ),
    )
    skipinitialspace: Optional[bool] = Field(
        None,
        description=(
            "When `True`, whitespace immediately following the [`delimiter`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] is "
            "ignored. See [the Python docs entry]"
            "(https://docs.python.org/3/library/csv.html#csv.Dialect.skipinitialspace)"
            " for more information."
        ),
    )
    strict: Optional[bool] = Field(
        None,
        description=(
            "When `True`, raise exception [Error]"
            "(https://docs.python.org/3/library/csv.html#csv.Error) on bad CSV input. "
            "See [the Python docs entry](https://docs.python.org/3/library/csv.html"
            "#csv.Dialect.strict) for more information."
        ),
    )

    @field_validator("base")
    @classmethod
    def validate_dialect_base(cls, value: str) -> str:
        """Ensure the given `base` dialect is registered locally."""
        if value not in csv.list_dialects():
            raise ValueError(
                f"{value!r} is not a known registered CSV dialect. "
                f"Registered dialects: {', '.join(csv.list_dialects())}."
            )
        return value
base: Optional[CSVDialect] = Field(None, description="A specific CSV dialect, e.g., 'excel'. Any other parameters here will overwrite the preset dialect parameters for the specified dialect.") class-attribute instance-attribute
delimiter: Optional[str] = Field(None, description='A one-character string used to separate fields. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.delimiter) for more information.', min_length=1, max_length=1) class-attribute instance-attribute
doublequote: Optional[bool] = Field(None, description='Controls how instances of [`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting.quotechar] appearing inside a field should themselves be quoted. When `True`, the character is doubled. When `False`, the [`escapechar`][oteapi.strategies.parse.text_csv.DialectFormatting.escapechar] is used as a prefix to the [`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting.quotechar]. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.doublequote) for more information.') class-attribute instance-attribute
escapechar: Optional[str] = Field(None, description='A one-character string used by the writer to escape the [`delimiter`][oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] if [`quoting`][oteapi.strategies.parse.text_csv.DialectFormatting.quoting] is set to [`QUOTE_NONE`][oteapi.strategies.parse.text_csv.QuoteConstants.QUOTE_NONE] and the [`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting.quotechar] if [`doublequote`][oteapi.strategies.parse.text_csv.DialectFormatting.doublequote] is `False`. On reading, the [`escapechar`][oteapi.strategies.parse.text_csv.DialectFormatting.escapechar] removes any special meaning from the following character. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.escapechar) for more information.', min_length=1, max_length=1) class-attribute instance-attribute
quotechar: Optional[str] = Field(None, description='A one-character string used to quote fields containing special characters, such as the [`delimiter`][oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] or [`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting.quotechar], or which contain new-line characters. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.quotechar) for more information.', min_length=1, max_length=1) class-attribute instance-attribute
quoting: Optional[QuoteConstants] = Field(None, description='Controls when quotes should be generated by the writer and recognised by the reader. It can take on any of the `QUOTE_*` constants (see section [Module Contents](https://docs.python.org/3/library/csv.html#csv-contents)). See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.quoting) for more information.') class-attribute instance-attribute
skipinitialspace: Optional[bool] = Field(None, description='When `True`, whitespace immediately following the [`delimiter`][oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] is ignored. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.skipinitialspace) for more information.') class-attribute instance-attribute
strict: Optional[bool] = Field(None, description='When `True`, raise exception [Error](https://docs.python.org/3/library/csv.html#csv.Error) on bad CSV input. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.strict) for more information.') class-attribute instance-attribute
validate_dialect_base(value) classmethod

Ensure the given base dialect is registered locally.

Source code in oteapi/strategies/parse/text_csv.py
172
173
174
175
176
177
178
179
180
181
@field_validator("base")
@classmethod
def validate_dialect_base(cls, value: str) -> str:
    """Ensure the given `base` dialect is registered locally."""
    if value not in csv.list_dialects():
        raise ValueError(
            f"{value!r} is not a known registered CSV dialect. "
            f"Registered dialects: {', '.join(csv.list_dialects())}."
        )
    return value

QuoteConstants

Bases: str, Enum

CSV module QUOTE_* constants.

Source code in oteapi/strategies/parse/text_csv.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class QuoteConstants(str, Enum):
    """CSV module `QUOTE_*` constants."""

    QUOTE_ALL = "QUOTE_ALL"
    QUOTE_MINIMAL = "QUOTE_MINIMAL"
    QUOTE_NONUMERIC = "QUOTE_NONNUMERIC"
    QUOTE_NONE = "QUOTE_NONE"

    def csv_constant(self) -> int:
        """Return the CSV lib equivalent constant."""
        return {
            self.QUOTE_ALL: csv.QUOTE_ALL,
            self.QUOTE_MINIMAL: csv.QUOTE_MINIMAL,
            self.QUOTE_NONUMERIC: csv.QUOTE_NONNUMERIC,
            self.QUOTE_NONE: csv.QUOTE_NONE,
        }[self]
QUOTE_ALL = 'QUOTE_ALL' class-attribute instance-attribute
QUOTE_MINIMAL = 'QUOTE_MINIMAL' class-attribute instance-attribute
QUOTE_NONE = 'QUOTE_NONE' class-attribute instance-attribute
QUOTE_NONUMERIC = 'QUOTE_NONNUMERIC' class-attribute instance-attribute
csv_constant()

Return the CSV lib equivalent constant.

Source code in oteapi/strategies/parse/text_csv.py
39
40
41
42
43
44
45
46
def csv_constant(self) -> int:
    """Return the CSV lib equivalent constant."""
    return {
        self.QUOTE_ALL: csv.QUOTE_ALL,
        self.QUOTE_MINIMAL: csv.QUOTE_MINIMAL,
        self.QUOTE_NONUMERIC: csv.QUOTE_NONNUMERIC,
        self.QUOTE_NONE: csv.QUOTE_NONE,
    }[self]

ReaderConfig

Bases: BaseModel

CSV DictReader configuration parameters.

See the Python docs for more information.

Source code in oteapi/strategies/parse/text_csv.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class ReaderConfig(BaseModel):
    """CSV DictReader configuration parameters.

    See [the Python docs](https://docs.python.org/3/library/csv.html#csv.DictReader)
    for more information.
    """

    fieldnames: Optional[list[str]] = Field(
        None,
        description=(
            "List of headers. If not set, the values in the first row of the CSV file "
            "will be used as the field names."
        ),
    )
    restkey: Optional[Hashable] = Field(
        None,
        description=(
            "If a row has more fields than [`fieldnames`]"
            "[oteapi.strategies.parse.text_csv.ReaderConfig.fieldnames], the "
            "remaining data is put in a list and stored with the field name specified "
            "by [`restkey`][oteapi.strategies.parse.text_csv.ReaderConfig.restkey]."
        ),
    )
    restval: Optional[Any] = Field(
        None,
        description=(
            "If a non-blank row has fewer fields than the length of [`fieldnames`]"
            "[oteapi.strategies.parse.text_csv.ReaderConfig.fieldnames], the missing "
            "values are filled-in with the value of [`restval`]"
            "[oteapi.strategies.parse.text_csv.ReaderConfig.restval]."
        ),
    )
    encoding: str = Field(
        "utf8",
        description="The file encoding.",
    )
encoding: str = Field('utf8', description='The file encoding.') class-attribute instance-attribute
fieldnames: Optional[list[str]] = Field(None, description='List of headers. If not set, the values in the first row of the CSV file will be used as the field names.') class-attribute instance-attribute
restkey: Optional[Hashable] = Field(None, description='If a row has more fields than [`fieldnames`][oteapi.strategies.parse.text_csv.ReaderConfig.fieldnames], the remaining data is put in a list and stored with the field name specified by [`restkey`][oteapi.strategies.parse.text_csv.ReaderConfig.restkey].') class-attribute instance-attribute
restval: Optional[Any] = Field(None, description='If a non-blank row has fewer fields than the length of [`fieldnames`][oteapi.strategies.parse.text_csv.ReaderConfig.fieldnames], the missing values are filled-in with the value of [`restval`][oteapi.strategies.parse.text_csv.ReaderConfig.restval].') class-attribute instance-attribute

resource

resource_url

Strategy class for resource/url.

ResourceURLConfig

Bases: ResourceConfig

Resource URL strategy config.

Source code in oteapi/strategies/resource/resource_url.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class ResourceURLConfig(ResourceConfig):
    """Resource URL strategy config."""

    resourceType: Literal["resource/url"] = Field(
        "resource/url",
        description=ResourceConfig.model_fields["resourceType"].description,
    )
    downloadUrl: HostlessAnyUrl = Field(
        ...,
        description=ResourceConfig.model_fields["downloadUrl"].description,
    )
    mediaType: str = Field(
        ...,
        description=ResourceConfig.model_fields["mediaType"].description,
    )
downloadUrl: HostlessAnyUrl = Field(..., description=ResourceConfig.model_fields['downloadUrl'].description) class-attribute instance-attribute
mediaType: str = Field(..., description=ResourceConfig.model_fields['mediaType'].description) class-attribute instance-attribute
resourceType: Literal['resource/url'] = Field('resource/url', description=ResourceConfig.model_fields['resourceType'].description) class-attribute instance-attribute

ResourceURLStrategy

Basic resource strategy targeting downloadUrl resources.

Source code in oteapi/strategies/resource/resource_url.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@dataclass
class ResourceURLStrategy:
    """Basic resource strategy targeting downloadUrl resources."""

    resource_config: ResourceURLConfig

    def initialize(self) -> AttrDict:
        """Initialize."""
        return AttrDict()

    def get(self) -> AttrDict:
        """resource distribution."""
        return AttrDict(
            **self.resource_config.model_dump(
                mode="json", exclude_unset=True, exclude={"resourceType"}
            )
        )
resource_config: ResourceURLConfig instance-attribute
get()

resource distribution.

Source code in oteapi/strategies/resource/resource_url.py
45
46
47
48
49
50
51
def get(self) -> AttrDict:
    """resource distribution."""
    return AttrDict(
        **self.resource_config.model_dump(
            mode="json", exclude_unset=True, exclude={"resourceType"}
        )
    )
initialize()

Initialize.

Source code in oteapi/strategies/resource/resource_url.py
41
42
43
def initialize(self) -> AttrDict:
    """Initialize."""
    return AttrDict()

transformation

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CELERY_APP = Celery(broker=f'redis://{REDIS_HOST}:{REDIS_PORT}', backend=f'redis://{REDIS_HOST}:{REDIS_PORT}') module-attribute

REDIS_HOST = os.getenv('OTEAPI_REDIS_HOST', 'redis') module-attribute

REDIS_PORT = int(os.getenv('OTEAPI_REDIS_PORT', '6379')) module-attribute

CeleryConfig

Bases: AttrDict

Celery configuration.

All fields here (including those added from the session through the get() method, as well as those added "anonymously") will be used as keyword arguments to the send_task() method for the Celery App.

Note

Using alias for the name field to favor populating it with task_name arguments, since this is the "original" field name. I.e., this is done for backwards compatibility.

Special pydantic configuration settings:

  • populate_by_name Allow populating CeleryConfig.name using name as well as task_name.
Source code in oteapi/strategies/transformation/celery_remote.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class CeleryConfig(AttrDict):
    """Celery configuration.

    All fields here (including those added from the session through the `get()` method,
    as well as those added "anonymously") will be used as keyword arguments to the
    `send_task()` method for the Celery App.

    Note:
        Using `alias` for the `name` field to favor populating it with `task_name`
        arguments, since this is the "original" field name. I.e., this is done for
        backwards compatibility.

    Special pydantic configuration settings:

    - **`populate_by_name`**
      Allow populating CeleryConfig.name using `name` as well as `task_name`.

    """

    model_config = ConfigDict(populate_by_name=True)

    name: str = Field(..., description="A task name.", alias="task_name")
    args: list = Field(..., description="List of arguments for the task.")
args: list = Field(..., description='List of arguments for the task.') class-attribute instance-attribute
model_config = ConfigDict(populate_by_name=True) class-attribute instance-attribute
name: str = Field(..., description='A task name.', alias='task_name') class-attribute instance-attribute

CeleryContent

Bases: AttrDict

Class for returning values from a Celery task.

Source code in oteapi/strategies/transformation/celery_remote.py
60
61
62
63
class CeleryContent(AttrDict):
    """Class for returning values from a Celery task."""

    celery_task_id: str = Field(..., description="A Celery task identifier.")
celery_task_id: str = Field(..., description='A Celery task identifier.') class-attribute instance-attribute

CeleryRemoteStrategy

Submit job to remote Celery runner.

Registers strategies:

  • ("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformationType", "celery/remote")`

    """

    transformation_config: CeleryStrategyConfig

    def get(self) -> CeleryContent:
        """Run a job, return a job ID."""

        result: Union[AsyncResult, Any] = CELERY_APP.send_task(
            **self.transformation_config.configuration.model_dump()
        )
        return CeleryContent(celery_task_id=result.task_id)

    def initialize(self) -> AttrDict:
        """Initialize a job."""
        return AttrDict()

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=CELERY_APP)
        return TransformationStatus(id=task_id, status=result.state)
transformation_config: CeleryStrategyConfig instance-attribute
get()

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
90
91
92
93
94
95
96
def get(self) -> CeleryContent:
    """Run a job, return a job ID."""

    result: Union[AsyncResult, Any] = CELERY_APP.send_task(
        **self.transformation_config.configuration.model_dump()
    )
    return CeleryContent(celery_task_id=result.task_id)
initialize()

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
 98
 99
100
def initialize(self) -> AttrDict:
    """Initialize a job."""
    return AttrDict()
status(task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
102
103
104
105
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=CELERY_APP)
    return TransformationStatus(id=task_id, status=result.state)

CeleryStrategyConfig

Bases: TransformationConfig

Celery strategy-specific configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
66
67
68
69
70
71
72
73
74
75
class CeleryStrategyConfig(TransformationConfig):
    """Celery strategy-specific configuration."""

    transformationType: Literal["celery/remote"] = Field(
        "celery/remote",
        description=TransformationConfig.model_fields["transformationType"].description,
    )
    configuration: CeleryConfig = Field(
        ..., description="Celery transformation strategy-specific configuration."
    )
configuration: CeleryConfig = Field(..., description='Celery transformation strategy-specific configuration.') class-attribute instance-attribute
transformationType: Literal['celery/remote'] = Field('celery/remote', description=TransformationConfig.model_fields['transformationType'].description) class-attribute instance-attribute