Skip to content

OTE-API Core Strategies

This page provides documentation for the oteapi.strategies submodule, where all the core OTE-API strategies are located.

These strategies will always be available when setting up a server based on the OTE-API Core package.

download

file

Download strategy class for the file scheme.

FileConfig

Bases: AttrDict

File-specific Configuration Data Model.

Source code in oteapi/strategies/download/file.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class FileConfig(AttrDict):
    """File-specific Configuration Data Model."""

    text: bool = Field(
        False,
        description=(
            "Whether the file should be opened in text mode. If `False`, the file will"
            " be opened in bytes mode."
        ),
    )
    encoding: Optional[str] = Field(
        None,
        description=(
            "Encoding used when opening the file. The default is platform dependent."
        ),
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for storing the downloaded file content.",
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute
encoding: Optional[str] = Field(None, description='Encoding used when opening the file. The default is platform dependent.') class-attribute instance-attribute
text: bool = Field(False, description='Whether the file should be opened in text mode. If `False`, the file will be opened in bytes mode.') class-attribute instance-attribute

FileResourceConfig

Bases: ResourceConfig

File download strategy filter config.

Source code in oteapi/strategies/download/file.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class FileResourceConfig(ResourceConfig):
    """File download strategy filter config."""

    downloadUrl: FileUrl = Field(  # type: ignore[assignment]
        ..., description="The file URL, which will be downloaded."
    )
    configuration: FileConfig = Field(
        FileConfig(), description="File download strategy-specific configuration."
    )

    @validator("downloadUrl")
    def ensure_path_exists(cls, value: FileUrl) -> FileUrl:
        """Ensure `path` is defined in `downloadUrl`."""
        if not value.path:
            raise ValueError("downloadUrl must contain a `path` part.")
        return value
configuration: FileConfig = Field(FileConfig(), description='File download strategy-specific configuration.') class-attribute instance-attribute
downloadUrl: FileUrl = Field(..., description='The file URL, which will be downloaded.') class-attribute instance-attribute
ensure_path_exists(value)

Ensure path is defined in downloadUrl.

Source code in oteapi/strategies/download/file.py
48
49
50
51
52
53
@validator("downloadUrl")
def ensure_path_exists(cls, value: FileUrl) -> FileUrl:
    """Ensure `path` is defined in `downloadUrl`."""
    if not value.path:
        raise ValueError("downloadUrl must contain a `path` part.")
    return value

FileStrategy

Strategy for retrieving data from a local file.

Registers strategies:

  • ("scheme", "file")
Source code in oteapi/strategies/download/file.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclass
class FileStrategy:
    """Strategy for retrieving data from a local file.

    **Registers strategies**:

    - `("scheme", "file")`

    """

    download_config: FileResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateFile:
        """Read local file."""
        filename = uri_to_path(self.download_config.downloadUrl).resolve()

        if not filename.exists():
            raise FileNotFoundError(f"File not found at {filename}")

        cache = DataCache(self.download_config.configuration.datacache_config)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            key = cache.add(
                filename.read_text(encoding=self.download_config.configuration.encoding)
                if self.download_config.configuration.text
                else filename.read_bytes()
            )

        return SessionUpdateFile(key=key)
download_config: FileResourceConfig instance-attribute
get(session=None)

Read local file.

Source code in oteapi/strategies/download/file.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateFile:
    """Read local file."""
    filename = uri_to_path(self.download_config.downloadUrl).resolve()

    if not filename.exists():
        raise FileNotFoundError(f"File not found at {filename}")

    cache = DataCache(self.download_config.configuration.datacache_config)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        key = cache.add(
            filename.read_text(encoding=self.download_config.configuration.encoding)
            if self.download_config.configuration.text
            else filename.read_bytes()
        )

    return SessionUpdateFile(key=key)
initialize(session=None)

Initialize.

Source code in oteapi/strategies/download/file.py
74
75
76
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateFile

Bases: SessionUpdate

Class for returning values from Download File strategy.

Source code in oteapi/strategies/download/file.py
56
57
58
59
class SessionUpdateFile(SessionUpdate):
    """Class for returning values from Download File strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str = Field(..., description='Key to access the data in the cache.') class-attribute instance-attribute

https

Download strategy class for http/https

HTTPSConfig

Bases: AttrDict

HTTP(S)-specific Configuration Data Model.

Source code in oteapi/strategies/download/https.py
16
17
18
19
20
21
22
class HTTPSConfig(AttrDict):
    """HTTP(S)-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for storing the downloaded file content.",
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute

HTTPSResourceConfig

Bases: ResourceConfig

HTTP(S) download strategy filter config.

Source code in oteapi/strategies/download/https.py
25
26
27
28
29
30
31
32
33
class HTTPSResourceConfig(ResourceConfig):
    """HTTP(S) download strategy filter config."""

    downloadUrl: AnyHttpUrl = Field(  # type: ignore[assignment]
        ..., description="The HTTP(S) URL, which will be downloaded."
    )
    configuration: HTTPSConfig = Field(
        HTTPSConfig(), description="HTTP(S) download strategy-specific configuration."
    )
configuration: HTTPSConfig = Field(HTTPSConfig(), description='HTTP(S) download strategy-specific configuration.') class-attribute instance-attribute
downloadUrl: AnyHttpUrl = Field(..., description='The HTTP(S) URL, which will be downloaded.') class-attribute instance-attribute

HTTPSStrategy

Strategy for retrieving data via http.

Registers strategies:

  • ("scheme", "http")
  • ("scheme", "https")
Source code in oteapi/strategies/download/https.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@dataclass
class HTTPSStrategy:
    """Strategy for retrieving data via http.

    **Registers strategies**:

    - `("scheme", "http")`
    - `("scheme", "https")`

    """

    download_config: HTTPSResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateHTTPS:
        """Download via http/https and store on local cache."""
        cache = DataCache(self.download_config.configuration.datacache_config)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            req = requests.get(
                self.download_config.downloadUrl,
                allow_redirects=True,
                timeout=(3, 27),  # timeout: (connect, read) in seconds
            )
            key = cache.add(req.content)

        return SessionUpdateHTTPS(key=key)
download_config: HTTPSResourceConfig instance-attribute
get(session=None)

Download via http/https and store on local cache.

Source code in oteapi/strategies/download/https.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateHTTPS:
    """Download via http/https and store on local cache."""
    cache = DataCache(self.download_config.configuration.datacache_config)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        req = requests.get(
            self.download_config.downloadUrl,
            allow_redirects=True,
            timeout=(3, 27),  # timeout: (connect, read) in seconds
        )
        key = cache.add(req.content)

    return SessionUpdateHTTPS(key=key)
initialize(session=None)

Initialize.

Source code in oteapi/strategies/download/https.py
55
56
57
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateHTTPS

Bases: SessionUpdate

Class for returning values from Download HTTPS strategy.

Source code in oteapi/strategies/download/https.py
36
37
38
39
class SessionUpdateHTTPS(SessionUpdate):
    """Class for returning values from Download HTTPS strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str = Field(..., description='Key to access the data in the cache.') class-attribute instance-attribute

sftp

Strategy class for sftp/ftp

AnyFtpUrl

Bases: AnyUrl

A (S)FTP URL model.

Source code in oteapi/strategies/download/sftp.py
18
19
20
21
class AnyFtpUrl(AnyUrl):
    """A (S)FTP URL model."""

    allowed_schemes = {"ftp", "sftp"}
allowed_schemes = {'ftp', 'sftp'} class-attribute instance-attribute

SFTPConfig

Bases: AttrDict

(S)FTP-specific Configuration Data Model.

Source code in oteapi/strategies/download/sftp.py
24
25
26
27
28
29
30
class SFTPConfig(AttrDict):
    """(S)FTP-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for storing the downloaded file content.",
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute

SFTPResourceConfig

Bases: ResourceConfig

(S)FTP download strategy filter config.

Source code in oteapi/strategies/download/sftp.py
33
34
35
36
37
38
39
40
41
class SFTPResourceConfig(ResourceConfig):
    """(S)FTP download strategy filter config."""

    downloadUrl: AnyFtpUrl = Field(  # type: ignore[assignment]
        ..., description="The (S)FTP URL, which will be downloaded."
    )
    configuration: SFTPConfig = Field(
        SFTPConfig(), description="(S)FTP download strategy-specific configuration."
    )
configuration: SFTPConfig = Field(SFTPConfig(), description='(S)FTP download strategy-specific configuration.') class-attribute instance-attribute
downloadUrl: AnyFtpUrl = Field(..., description='The (S)FTP URL, which will be downloaded.') class-attribute instance-attribute

SFTPStrategy

Strategy for retrieving data via sftp.

Registers strategies:

  • ("scheme", "ftp")
  • ("scheme", "sftp")
Source code in oteapi/strategies/download/sftp.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclass
class SFTPStrategy:
    """Strategy for retrieving data via sftp.

    **Registers strategies**:

    - `("scheme", "ftp")`
    - `("scheme", "sftp")`

    """

    download_config: SFTPResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateSFTP:
        """Download via sftp"""
        cache = DataCache(self.download_config.configuration.datacache_config)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            # Setup connection options
            cnopts = pysftp.CnOpts()
            cnopts.hostkeys = None

            # open connection and store data locally
            with pysftp.Connection(
                host=self.download_config.downloadUrl.host,
                username=self.download_config.downloadUrl.user,
                password=self.download_config.downloadUrl.password,
                port=self.download_config.downloadUrl.port,
                cnopts=cnopts,
            ) as sftp:
                # Because of insane locking on Windows, we have to close
                # the downloaded file before adding it to the cache
                with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                    localpath = Path(handle.name).resolve()
                try:
                    sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
                    key = cache.add(localpath.read_bytes())
                finally:
                    localpath.unlink()

        return SessionUpdateSFTP(key=key)
download_config: SFTPResourceConfig instance-attribute
get(session=None)

Download via sftp

Source code in oteapi/strategies/download/sftp.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateSFTP:
    """Download via sftp"""
    cache = DataCache(self.download_config.configuration.datacache_config)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        # Setup connection options
        cnopts = pysftp.CnOpts()
        cnopts.hostkeys = None

        # open connection and store data locally
        with pysftp.Connection(
            host=self.download_config.downloadUrl.host,
            username=self.download_config.downloadUrl.user,
            password=self.download_config.downloadUrl.password,
            port=self.download_config.downloadUrl.port,
            cnopts=cnopts,
        ) as sftp:
            # Because of insane locking on Windows, we have to close
            # the downloaded file before adding it to the cache
            with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                localpath = Path(handle.name).resolve()
            try:
                sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
                key = cache.add(localpath.read_bytes())
            finally:
                localpath.unlink()

    return SessionUpdateSFTP(key=key)
initialize(session=None)

Initialize.

Source code in oteapi/strategies/download/sftp.py
63
64
65
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateSFTP

Bases: SessionUpdate

Class for returning values from Download SFTP strategy.

Source code in oteapi/strategies/download/sftp.py
44
45
46
47
class SessionUpdateSFTP(SessionUpdate):
    """Class for returning values from Download SFTP strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str = Field(..., description='Key to access the data in the cache.') class-attribute instance-attribute

filter

crop_filter

Demo-filter strategy

CropImageConfig

Bases: AttrDict

Configuration model for crop data.

Source code in oteapi/strategies/filter/crop_filter.py
14
15
16
17
18
19
class CropImageConfig(AttrDict):
    """Configuration model for crop data."""

    crop: Tuple[int, int, int, int] = Field(
        ..., description="Box cropping parameters (left, top, right, bottom)."
    )
crop: Tuple[int, int, int, int] = Field(..., description='Box cropping parameters (left, top, right, bottom).') class-attribute instance-attribute

CropImageFilter

Strategy for cropping an image.

Registers strategies:

  • ("filterType", "filter/crop")
Source code in oteapi/strategies/filter/crop_filter.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class CropImageFilter:
    """Strategy for cropping an image.

    **Registers strategies**:

    - `("filterType", "filter/crop")`

    """

    filter_config: CropImageFilterConfig

    def initialize(
        self,
        session: "Optional[Dict[str, Any]]" = None,
    ) -> SessionUpdateCropFilter:
        """Initialize strategy and return a dictionary."""
        return SessionUpdateCropFilter(
            imagecrop=self.filter_config.configuration.crop,
        )

    def get(
        self,
        session: "Optional[Dict[str, Any]]" = None,
    ) -> SessionUpdate:
        """Execute strategy and return a dictionary"""
        return SessionUpdate()
filter_config: CropImageFilterConfig instance-attribute
get(session=None)

Execute strategy and return a dictionary

Source code in oteapi/strategies/filter/crop_filter.py
64
65
66
67
68
69
def get(
    self,
    session: "Optional[Dict[str, Any]]" = None,
) -> SessionUpdate:
    """Execute strategy and return a dictionary"""
    return SessionUpdate()
initialize(session=None)

Initialize strategy and return a dictionary.

Source code in oteapi/strategies/filter/crop_filter.py
55
56
57
58
59
60
61
62
def initialize(
    self,
    session: "Optional[Dict[str, Any]]" = None,
) -> SessionUpdateCropFilter:
    """Initialize strategy and return a dictionary."""
    return SessionUpdateCropFilter(
        imagecrop=self.filter_config.configuration.crop,
    )

CropImageFilterConfig

Bases: FilterConfig

Crop filter strategy filter config.

Source code in oteapi/strategies/filter/crop_filter.py
22
23
24
25
26
27
28
29
30
31
32
class CropImageFilterConfig(FilterConfig):
    """Crop filter strategy filter config."""

    filterType: str = Field(
        "filter/crop",
        const=True,
        description=FilterConfig.__fields__["filterType"].field_info.description,
    )
    configuration: CropImageConfig = Field(
        ..., description="Image crop filter strategy-specific configuration."
    )
configuration: CropImageConfig = Field(..., description='Image crop filter strategy-specific configuration.') class-attribute instance-attribute
filterType: str = Field('filter/crop', const=True, description=FilterConfig.__fields__['filterType'].field_info.description) class-attribute instance-attribute

SessionUpdateCropFilter

Bases: SessionUpdate

Return model for CropImageFilter.

Source code in oteapi/strategies/filter/crop_filter.py
35
36
37
38
39
40
class SessionUpdateCropFilter(SessionUpdate):
    """Return model for `CropImageFilter`."""

    imagecrop: Tuple[int, int, int, int] = Field(
        ..., description="Box cropping parameters (left, top, right, bottom)."
    )
imagecrop: Tuple[int, int, int, int] = Field(..., description='Box cropping parameters (left, top, right, bottom).') class-attribute instance-attribute

sql_query_filter

SQL query filter strategy.

SQLQueryFilter

Strategy for a SQL query filter.

Registers strategies:

  • ("filterType", "filter/sql")
Source code in oteapi/strategies/filter/sql_query_filter.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass
class SQLQueryFilter:
    """Strategy for a SQL query filter.

    **Registers strategies**:

    - `("filterType", "filter/sql")`

    """

    filter_config: SqlQueryFilterConfig

    def initialize(
        self,
        session: "Optional[Dict[str, Any]]" = None,
    ) -> SessionUpdateSqlQuery:
        """Initialize strategy."""
        return SessionUpdateSqlQuery(sqlquery=self.filter_config.query)

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Execute strategy and return a dictionary."""
        return SessionUpdate()
filter_config: SqlQueryFilterConfig instance-attribute
get(session=None)

Execute strategy and return a dictionary.

Source code in oteapi/strategies/filter/sql_query_filter.py
50
51
52
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Execute strategy and return a dictionary."""
    return SessionUpdate()
initialize(session=None)

Initialize strategy.

Source code in oteapi/strategies/filter/sql_query_filter.py
43
44
45
46
47
48
def initialize(
    self,
    session: "Optional[Dict[str, Any]]" = None,
) -> SessionUpdateSqlQuery:
    """Initialize strategy."""
    return SessionUpdateSqlQuery(sqlquery=self.filter_config.query)

SessionUpdateSqlQuery

Bases: SessionUpdate

Class for returning values from SQL Query data model.

Source code in oteapi/strategies/filter/sql_query_filter.py
25
26
27
28
class SessionUpdateSqlQuery(SessionUpdate):
    """Class for returning values from SQL Query data model."""

    sqlquery: str = Field(..., description="A SQL query string.")
sqlquery: str = Field(..., description='A SQL query string.') class-attribute instance-attribute

SqlQueryFilterConfig

Bases: FilterConfig

SQL query filter strategy filter config.

Source code in oteapi/strategies/filter/sql_query_filter.py
14
15
16
17
18
19
20
21
22
class SqlQueryFilterConfig(FilterConfig):
    """SQL query filter strategy filter config."""

    filterType: str = Field(
        "filter/sql",
        const=True,
        description=FilterConfig.__fields__["filterType"].field_info.description,
    )
    query: str = Field(..., description="A SQL query string.")
filterType: str = Field('filter/sql', const=True, description=FilterConfig.__fields__['filterType'].field_info.description) class-attribute instance-attribute
query: str = Field(..., description='A SQL query string.') class-attribute instance-attribute

mapping

mapping

Mapping filter strategy.

MappingSessionUpdate

Bases: SessionUpdate

SessionUpdate model for mappings.

Source code in oteapi/strategies/mapping/mapping.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class MappingSessionUpdate(SessionUpdate):
    """SessionUpdate model for mappings."""

    prefixes: Dict[str, str] = Field(
        ...,
        description=(
            "Dictionary of shortnames that expands to an IRI "
            "given as local value/IRI-expansion-pairs."
        ),
    )
    triples: List[RDFTriple] = Field(
        ...,
        description="List of semantic triples given as (subject, predicate, object).",
    )
prefixes: Dict[str, str] = Field(..., description='Dictionary of shortnames that expands to an IRI given as local value/IRI-expansion-pairs.') class-attribute instance-attribute
triples: List[RDFTriple] = Field(..., description='List of semantic triples given as (subject, predicate, object).') class-attribute instance-attribute

MappingStrategy

Strategy for a mapping.

The mapping strategy simply adds more prefixes and triples to the prefixes and triples fields in the session such that they are available for other strategies, like function strategies that convert between data models.

Nothing is returned to avoid deleting existing mappings.

Registers strategies:

  • ("mappingType", "triples")
Source code in oteapi/strategies/mapping/mapping.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class MappingStrategy:
    """Strategy for a mapping.

    The mapping strategy simply adds more prefixes and triples to the
    `prefixes` and `triples` fields in the session such that they are
    available for other strategies, like function strategies that convert
    between data models.

    Nothing is returned to avoid deleting existing mappings.

    **Registers strategies**:

    - `("mappingType", "triples")`

    """

    mapping_config: MappingConfig

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> MappingSessionUpdate:
        """Initialize strategy."""
        prefixes = session.get("prefixes", {}) if session else {}
        triples = set(session.get("triples", []) if session else [])

        if self.mapping_config.prefixes:
            prefixes.update(self.mapping_config.prefixes)
        if self.mapping_config.triples:
            triples.update(self.mapping_config.triples)

        return MappingSessionUpdate(prefixes=prefixes, triples=triples)

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Execute strategy and return a dictionary."""
        return SessionUpdate()
mapping_config: MappingConfig instance-attribute
get(session=None)

Execute strategy and return a dictionary.

Source code in oteapi/strategies/mapping/mapping.py
62
63
64
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Execute strategy and return a dictionary."""
    return SessionUpdate()
initialize(session=None)

Initialize strategy.

Source code in oteapi/strategies/mapping/mapping.py
48
49
50
51
52
53
54
55
56
57
58
59
60
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> MappingSessionUpdate:
    """Initialize strategy."""
    prefixes = session.get("prefixes", {}) if session else {}
    triples = set(session.get("triples", []) if session else [])

    if self.mapping_config.prefixes:
        prefixes.update(self.mapping_config.prefixes)
    if self.mapping_config.triples:
        triples.update(self.mapping_config.triples)

    return MappingSessionUpdate(prefixes=prefixes, triples=triples)

parse

application_json

Strategy class for application/json.

JSONConfig

Bases: AttrDict

JSON parse-specific Configuration Data Model.

Source code in oteapi/strategies/parse/application_json.py
17
18
19
20
21
22
23
class JSONConfig(AttrDict):
    """JSON parse-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for storing the downloaded file content.",
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute

JSONDataParseStrategy

Parse strategy for JSON.

Registers strategies:

  • ("mediaType", "application/json")
Source code in oteapi/strategies/parse/application_json.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@dataclass
class JSONDataParseStrategy:
    """Parse strategy for JSON.

    **Registers strategies**:

    - `("mediaType", "application/json")`

    """

    parse_config: JSONResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateJSONParse:
        """Parse json."""
        downloader = create_strategy("download", self.parse_config)
        output = downloader.get()
        cache = DataCache(self.parse_config.configuration.datacache_config)
        content = cache.get(output["key"])

        if isinstance(content, dict):
            return SessionUpdateJSONParse(content=content)
        return SessionUpdateJSONParse(content=json.loads(content))
parse_config: JSONResourceConfig instance-attribute
get(session=None)

Parse json.

Source code in oteapi/strategies/parse/application_json.py
61
62
63
64
65
66
67
68
69
70
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateJSONParse:
    """Parse json."""
    downloader = create_strategy("download", self.parse_config)
    output = downloader.get()
    cache = DataCache(self.parse_config.configuration.datacache_config)
    content = cache.get(output["key"])

    if isinstance(content, dict):
        return SessionUpdateJSONParse(content=content)
    return SessionUpdateJSONParse(content=json.loads(content))
initialize(session=None)

Initialize.

Source code in oteapi/strategies/parse/application_json.py
57
58
59
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

JSONResourceConfig

Bases: ResourceConfig

JSON parse strategy filter config.

Source code in oteapi/strategies/parse/application_json.py
26
27
28
29
30
31
32
33
34
35
36
class JSONResourceConfig(ResourceConfig):
    """JSON parse strategy filter config."""

    mediaType: str = Field(
        "application/json",
        const=True,
        description=ResourceConfig.__fields__["mediaType"].field_info.description,
    )
    configuration: JSONConfig = Field(
        JSONConfig(), description="JSON parse strategy-specific configuration."
    )
configuration: JSONConfig = Field(JSONConfig(), description='JSON parse strategy-specific configuration.') class-attribute instance-attribute
mediaType: str = Field('application/json', const=True, description=ResourceConfig.__fields__['mediaType'].field_info.description) class-attribute instance-attribute

SessionUpdateJSONParse

Bases: SessionUpdate

Class for returning values from JSON Parse.

Source code in oteapi/strategies/parse/application_json.py
39
40
41
42
class SessionUpdateJSONParse(SessionUpdate):
    """Class for returning values from JSON Parse."""

    content: dict = Field(..., description="Content of the JSON document.")
content: dict = Field(..., description='Content of the JSON document.') class-attribute instance-attribute

application_vnd_sqlite

Strategy class for application/vnd.sqlite3.

SessionUpdateSqLiteParse

Bases: SessionUpdate

Configuration model for SqLiteParse.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
62
63
64
65
class SessionUpdateSqLiteParse(SessionUpdate):
    """Configuration model for SqLiteParse."""

    result: list = Field(..., description="List of results from the query.")
result: list = Field(..., description='List of results from the query.') class-attribute instance-attribute

SqliteParseConfig

Bases: AttrDict

Configuration data model for SqliteParseStrategy.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
18
19
20
21
22
23
24
25
26
27
class SqliteParseConfig(AttrDict):
    """Configuration data model for
    [`SqliteParseStrategy`][oteapi.strategies.parse.application_vnd_sqlite.SqliteParseStrategy].
    """

    sqlquery: str = Field("", description="A SQL query string.")
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configuration options for the local data cache.",
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configuration options for the local data cache.') class-attribute instance-attribute
sqlquery: str = Field('', description='A SQL query string.') class-attribute instance-attribute

SqliteParseStrategy

Parse strategy for SQLite.

Registers strategies:

  • ("mediaType", "application/vnd.sqlite3")

Purpose of this strategy: Download a SQLite database using downloadUrl and run a SQL query on the database to return all relevant rows.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@dataclass
class SqliteParseStrategy:
    """Parse strategy for SQLite.

    **Registers strategies**:

    - `("mediaType", "application/vnd.sqlite3")`

    Purpose of this strategy: Download a SQLite database using `downloadUrl` and run a
    SQL query on the database to return all relevant rows.

    """

    parse_config: SqliteParserResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize strategy."""
        return SessionUpdate()

    def get(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> SessionUpdateSqLiteParse:
        """Parse SQLite query responses."""
        if session:
            self._use_filters(session)
        session = session if session else {}

        # Retrieve SQLite file
        download_config = self.parse_config.copy()
        del download_config.configuration
        downloader = create_strategy("download", download_config)
        session.update(downloader.initialize(session))
        cache_key = downloader.get(session).get("key", "")

        cache = DataCache(self.parse_config.configuration.datacache_config)
        with cache.getfile(cache_key, suffix="db") as filename:
            connection = create_connection(filename)
            cursor = connection.cursor()
            result = cursor.execute(self.parse_config.configuration.sqlquery).fetchall()
            connection.close()
        return SessionUpdateSqLiteParse(result=result)

    def _use_filters(self, session: "Dict[str, Any]") -> None:
        """Update `config` according to filter values found in the session."""
        if "sqlquery" in session and not self.parse_config.configuration.sqlquery:
            # Use SQL query available in session
            self.parse_config.configuration.sqlquery = session["sqlquery"]
parse_config: SqliteParserResourceConfig instance-attribute
get(session=None)

Parse SQLite query responses.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def get(
    self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateSqLiteParse:
    """Parse SQLite query responses."""
    if session:
        self._use_filters(session)
    session = session if session else {}

    # Retrieve SQLite file
    download_config = self.parse_config.copy()
    del download_config.configuration
    downloader = create_strategy("download", download_config)
    session.update(downloader.initialize(session))
    cache_key = downloader.get(session).get("key", "")

    cache = DataCache(self.parse_config.configuration.datacache_config)
    with cache.getfile(cache_key, suffix="db") as filename:
        connection = create_connection(filename)
        cursor = connection.cursor()
        result = cursor.execute(self.parse_config.configuration.sqlquery).fetchall()
        connection.close()
    return SessionUpdateSqLiteParse(result=result)
initialize(session=None)

Initialize strategy.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
83
84
85
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize strategy."""
    return SessionUpdate()

SqliteParserResourceConfig

Bases: ResourceConfig

SQLite parse strategy resource config.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
30
31
32
33
34
35
36
37
38
39
40
class SqliteParserResourceConfig(ResourceConfig):
    """SQLite parse strategy resource config."""

    mediaType: str = Field(
        "application/vnd.sqlite3",
        const=True,
        description=ResourceConfig.__fields__["mediaType"].field_info.description,
    )
    configuration: SqliteParseConfig = Field(
        SqliteParseConfig(), description="SQLite parse strategy-specific configuration."
    )
configuration: SqliteParseConfig = Field(SqliteParseConfig(), description='SQLite parse strategy-specific configuration.') class-attribute instance-attribute
mediaType: str = Field('application/vnd.sqlite3', const=True, description=ResourceConfig.__fields__['mediaType'].field_info.description) class-attribute instance-attribute

create_connection(db_file)

Create a database connection to SQLite database.

Parameters:

Name Type Description Default
db_file Path

Full path to SQLite database file.

required

Raises:

Type Description
Error

If a DB connection cannot be made.

Returns:

Type Description
Connection

Connection object.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def create_connection(db_file: Path) -> sqlite3.Connection:
    """Create a database connection to SQLite database.

    Parameters:
        db_file: Full path to SQLite database file.

    Raises:
        sqlite3.Error: If a DB connection cannot be made.

    Returns:
        Connection object.

    """
    try:
        return sqlite3.connect(db_file)
    except sqlite3.Error as exc:
        raise sqlite3.Error("Could not connect to given SQLite DB.") from exc

excel_xlsx

Strategy class for workbook/xlsx.

SessionUpdateXLSXParse

Bases: SessionUpdate

Class for returning values from XLSXParse.

Source code in oteapi/strategies/parse/excel_xlsx.py
19
20
21
22
23
24
25
class SessionUpdateXLSXParse(SessionUpdate):
    """Class for returning values from XLSXParse."""

    data: Dict[str, list] = Field(
        ...,
        description="A dict with column-name/column-value pairs. The values are lists.",
    )
data: Dict[str, list] = Field(..., description='A dict with column-name/column-value pairs. The values are lists.') class-attribute instance-attribute

XLSXParseConfig

Bases: AttrDict

Data model for retrieving a rectangular section of an Excel sheet.

Source code in oteapi/strategies/parse/excel_xlsx.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class XLSXParseConfig(AttrDict):
    """Data model for retrieving a rectangular section of an Excel sheet."""

    worksheet: str = Field(..., description="Name of worksheet to load.")
    row_from: Optional[int] = Field(
        None,
        description="Excel row number of first row. Defaults to first assigned row.",
    )
    col_from: Optional[Union[int, str]] = Field(
        None,
        description=(
            "Excel column number or label of first column. Defaults to first assigned "
            "column."
        ),
    )
    row_to: Optional[int] = Field(
        None, description="Excel row number of last row. Defaults to last assigned row."
    )
    col_to: Optional[Union[int, str]] = Field(
        None,
        description=(
            "Excel column number or label of last column. Defaults to last assigned "
            "column."
        ),
    )
    header_row: Optional[int] = Field(
        None,
        description=(
            "Row number with the headers. Defaults to `1` if header is given, "
            "otherwise `None`."
        ),
    )
    header: Optional[List[str]] = Field(
        None,
        description=(
            "Optional list of column names, specifying the columns to return. "
            "These names they should match cells in `header_row`."
        ),
    )
    new_header: Optional[List[str]] = Field(
        None,
        description=(
            "Optional list of new column names replacing `header` in the output."
        ),
    )
    download_config: AttrDict = Field(
        AttrDict(),
        description="Configurations provided to a download strategy.",
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for retrieving the downloaded content.",
    )
col_from: Optional[Union[int, str]] = Field(None, description='Excel column number or label of first column. Defaults to first assigned column.') class-attribute instance-attribute
col_to: Optional[Union[int, str]] = Field(None, description='Excel column number or label of last column. Defaults to last assigned column.') class-attribute instance-attribute
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for retrieving the downloaded content.') class-attribute instance-attribute
download_config: AttrDict = Field(AttrDict(), description='Configurations provided to a download strategy.') class-attribute instance-attribute
header: Optional[List[str]] = Field(None, description='Optional list of column names, specifying the columns to return. These names they should match cells in `header_row`.') class-attribute instance-attribute
header_row: Optional[int] = Field(None, description='Row number with the headers. Defaults to `1` if header is given, otherwise `None`.') class-attribute instance-attribute
new_header: Optional[List[str]] = Field(None, description='Optional list of new column names replacing `header` in the output.') class-attribute instance-attribute
row_from: Optional[int] = Field(None, description='Excel row number of first row. Defaults to first assigned row.') class-attribute instance-attribute
row_to: Optional[int] = Field(None, description='Excel row number of last row. Defaults to last assigned row.') class-attribute instance-attribute
worksheet: str = Field(..., description='Name of worksheet to load.') class-attribute instance-attribute

XLSXParseParserConfig

Bases: ParserConfig

XLSX parse strategy resource config.

Source code in oteapi/strategies/parse/excel_xlsx.py
83
84
85
86
87
88
89
90
91
92
93
class XLSXParseParserConfig(ParserConfig):
    """XLSX parse strategy resource config."""

    parserType: str = Field(
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        const=True,
        description=ParserConfig.__fields__["parserType"].field_info.description,
    )
    configuration: XLSXParseConfig = Field(
        ..., description="SQLite parse strategy-specific configuration."
    )
configuration: XLSXParseConfig = Field(..., description='SQLite parse strategy-specific configuration.') class-attribute instance-attribute
parserType: str = Field('application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', const=True, description=ParserConfig.__fields__['parserType'].field_info.description) class-attribute instance-attribute

XLSXParseStrategy

Parse strategy for Excel XLSX files.

Registers strategies:

  • ("parserType", "excel_xlsx")
Source code in oteapi/strategies/parse/excel_xlsx.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@dataclass
class XLSXParseStrategy:
    """Parse strategy for Excel XLSX files.

    **Registers strategies**:

    - `("parserType", "excel_xlsx")`

    """

    parse_config: XLSXParseParserConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateXLSXParse:
        """Parses selected region of an excel file.

        Returns:
            A dict with column-name/column-value pairs. The values are lists.

        """

        cache = DataCache(self.parse_config.configuration.datacache_config)
        if session is None:
            raise ValueError("Missing session")
        with cache.getfile(key=session["key"], suffix=".xlsx") as filename:
            # Note that we have to set read_only=False to ensure that
            # load_workbook() properly closes the xlsx file after reading.
            # Otherwise Windows will fail when the temporary file is removed
            # when leaving the with statement.
            workbook = load_workbook(filename=filename, read_only=False, data_only=True)

        worksheet = workbook[self.parse_config.configuration.worksheet]
        set_model_defaults(self.parse_config.configuration, worksheet)
        columns = get_column_indices(self.parse_config.configuration, worksheet)

        data = []
        for row in worksheet.iter_rows(
            min_row=self.parse_config.configuration.row_from,
            max_row=self.parse_config.configuration.row_to,
            min_col=min(columns),
            max_col=max(columns),
        ):
            data.append([row[c - 1].value for c in columns])

        if self.parse_config.configuration.header_row:
            row = worksheet.iter_rows(
                min_row=self.parse_config.configuration.header_row,
                max_row=self.parse_config.configuration.header_row,
                min_col=min(columns),
                max_col=max(columns),
            ).__next__()
            header = [row[c - 1].value for c in columns]
        else:
            header = None

        if self.parse_config.configuration.new_header:
            nhead = len(header) if header else len(data[0]) if data else 0
            if len(self.parse_config.configuration.new_header) != nhead:
                raise TypeError(
                    f"length of `new_header` (={len(self.parse_config.configuration.new_header)}) "
                    f"doesn't match number of columns (={len(header) if header else 0})"
                )
            if header:
                for i, val in enumerate(self.parse_config.configuration.new_header):
                    if val is not None:
                        header[i] = val
            elif data:
                header = self.parse_config.configuration.new_header

        if header is None:
            header = [get_column_letter(col + 1) for col in range(len(data))]

        transposed = [list(datum) for datum in zip(*data)]
        return SessionUpdateXLSXParse(
            data={key: value for key, value in zip(header, transposed)}
        )
parse_config: XLSXParseParserConfig instance-attribute
get(session=None)

Parses selected region of an excel file.

Returns:

Type Description
SessionUpdateXLSXParse

A dict with column-name/column-value pairs. The values are lists.

Source code in oteapi/strategies/parse/excel_xlsx.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateXLSXParse:
    """Parses selected region of an excel file.

    Returns:
        A dict with column-name/column-value pairs. The values are lists.

    """

    cache = DataCache(self.parse_config.configuration.datacache_config)
    if session is None:
        raise ValueError("Missing session")
    with cache.getfile(key=session["key"], suffix=".xlsx") as filename:
        # Note that we have to set read_only=False to ensure that
        # load_workbook() properly closes the xlsx file after reading.
        # Otherwise Windows will fail when the temporary file is removed
        # when leaving the with statement.
        workbook = load_workbook(filename=filename, read_only=False, data_only=True)

    worksheet = workbook[self.parse_config.configuration.worksheet]
    set_model_defaults(self.parse_config.configuration, worksheet)
    columns = get_column_indices(self.parse_config.configuration, worksheet)

    data = []
    for row in worksheet.iter_rows(
        min_row=self.parse_config.configuration.row_from,
        max_row=self.parse_config.configuration.row_to,
        min_col=min(columns),
        max_col=max(columns),
    ):
        data.append([row[c - 1].value for c in columns])

    if self.parse_config.configuration.header_row:
        row = worksheet.iter_rows(
            min_row=self.parse_config.configuration.header_row,
            max_row=self.parse_config.configuration.header_row,
            min_col=min(columns),
            max_col=max(columns),
        ).__next__()
        header = [row[c - 1].value for c in columns]
    else:
        header = None

    if self.parse_config.configuration.new_header:
        nhead = len(header) if header else len(data[0]) if data else 0
        if len(self.parse_config.configuration.new_header) != nhead:
            raise TypeError(
                f"length of `new_header` (={len(self.parse_config.configuration.new_header)}) "
                f"doesn't match number of columns (={len(header) if header else 0})"
            )
        if header:
            for i, val in enumerate(self.parse_config.configuration.new_header):
                if val is not None:
                    header[i] = val
        elif data:
            header = self.parse_config.configuration.new_header

    if header is None:
        header = [get_column_letter(col + 1) for col in range(len(data))]

    transposed = [list(datum) for datum in zip(*data)]
    return SessionUpdateXLSXParse(
        data={key: value for key, value in zip(header, transposed)}
    )
initialize(session=None)

Initialize.

Source code in oteapi/strategies/parse/excel_xlsx.py
165
166
167
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

get_column_indices(model, worksheet)

Helper function returning a list of column indices.

Parameters:

Name Type Description Default
model XLSXParseConfig

The parsed data model.

required
worksheet Worksheet

Excel worksheet, from which the header values will be retrieved.

required

Returns:

Type Description
Iterable[int]

A list of column indices.

Source code in oteapi/strategies/parse/excel_xlsx.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def get_column_indices(
    model: XLSXParseConfig, worksheet: "Worksheet"
) -> "Iterable[int]":
    """Helper function returning a list of column indices.

    Parameters:
        model: The parsed data model.
        worksheet: Excel worksheet, from which the header values will be retrieved.

    Returns:
        A list of column indices.

    """
    if not isinstance(model.col_from, int) or not isinstance(model.col_to, int):
        raise TypeError("Expected `model.col_from` and `model.col_to` to be integers.")

    if model.header:
        header_dict = {
            worksheet.cell(model.header_row, col).value: col
            for col in range(model.col_from, model.col_to + 1)
        }
        return [header_dict[h] for h in model.header]
    return range(model.col_from, model.col_to + 1)

set_model_defaults(model, worksheet)

Update data model model with default values obtained from worksheet.

Parameters:

Name Type Description Default
model XLSXParseConfig

The parsed data model.

required
worksheet Worksheet

Excel worksheet, from which the default values will be obtained.

required
Source code in oteapi/strategies/parse/excel_xlsx.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def set_model_defaults(model: XLSXParseConfig, worksheet: "Worksheet") -> None:
    """Update data model `model` with default values obtained from `worksheet`.

    Parameters:
        model: The parsed data model.
        worksheet: Excel worksheet, from which the default values will be obtained.

    """
    if model.row_from is None:
        if model.header:
            # assume that data starts on the first row after the header
            model.row_from = model.header_row + 1 if model.header_row else 1
        else:
            model.row_from = worksheet.min_row

    if model.row_to is None:
        model.row_to = worksheet.max_row

    if model.col_from is None:
        model.col_from = worksheet.min_column
    elif isinstance(model.col_from, str):
        model.col_from = column_index_from_string(model.col_from)

    if model.col_to is None:
        model.col_to = worksheet.max_column
    elif isinstance(model.col_to, str):
        model.col_to = column_index_from_string(model.col_to)

    if model.header and not model.header_row:
        model.header_row = 1

image

Strategy class for image/jpg.

ImageDataParseStrategy

Parse strategy for images.

This strategy uses Pillow to read a raw image from the data cache, converts it into a NumPy array and stores the new array in the data cache.

It also supports simple cropping and image conversions.

The key to the new array and other metadata is stored in the session. See SessionUpdateImageParse for more info.

Registers strategies:

  • ("mediaType", "image/jpg")
  • ("mediaType", "image/jpeg")
  • ("mediaType", "image/jp2")
  • ("mediaType", "image/png")
  • ("mediaType", "image/gif")
  • ("mediaType", "image/tiff")
  • ("mediaType", "image/eps")
Source code in oteapi/strategies/parse/image.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@dataclass
class ImageDataParseStrategy:
    """Parse strategy for images.

    This strategy uses Pillow to read a raw image from the data cache,
    converts it into a NumPy array and stores the new array in the
    data cache.

    It also supports simple cropping and image conversions.

    The key to the new array and other metadata is stored in the session. See
    [`SessionUpdateImageParse`][oteapi.strategies.parse.image.SessionUpdateImageParse]
    for more info.

    **Registers strategies**:

    - `("mediaType", "image/jpg")`
    - `("mediaType", "image/jpeg")`
    - `("mediaType", "image/jp2")`
    - `("mediaType", "image/png")`
    - `("mediaType", "image/gif")`
    - `("mediaType", "image/tiff")`
    - `("mediaType", "image/eps")`

    """

    parse_config: ImageParserResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize strategy."""
        return SessionUpdate()

    def get(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> SessionUpdateImageParse:
        """Execute the strategy."""
        if not session:
            session = {}

        config = self.parse_config.configuration
        crop = config.crop if config.crop else session.get("imagecrop")

        mime_format = self.parse_config.mediaType.split("/")[1]
        image_format = SupportedFormat[mime_format].value

        # Proper download configurations
        conf = self.parse_config.dict()
        conf["configuration"] = config.download_config or {}
        download_config = ResourceConfig(**conf)

        downloader = create_strategy("download", download_config)
        session.update(downloader.initialize(session))

        downloader = create_strategy("download", download_config)
        output = downloader.get(session)
        session.update(output)

        if config.datacache_config and config.datacache_config.accessKey:
            cache_key = config.datacache_config.accessKey
        elif "key" in output:
            cache_key = output["key"]
        else:
            raise RuntimeError("No data cache key provided to the downloaded content")

        cache = DataCache(config.datacache_config)

        # Treat image according to filter values
        with cache.getfile(cache_key, suffix=mime_format) as filename:
            image = Image.open(filename, formats=[image_format])
            if crop:
                image = image.crop(crop)
            if config.image_mode:
                image = image.convert(mode=config.image_mode)

            if image_format == "GIF":
                if image.info.get("version", b"").startswith(b"GIF"):
                    image.info.update(
                        {"version": image.info.get("version", b"")[len(b"GIF") :]}
                    )

            # Use the buffer protocol to store the image in the datacache
            data = np.asarray(image)
            image_key = cache.add(
                data,
                key=config.image_key,
                tag=str(id(session)),
            )

            if image.mode == "P":
                image_palette_key = cache.add(
                    np.asarray(image.getpalette()), tag=str(id(session))
                )
            else:
                image_palette_key = None

            # The session must be json serialisable - filter out all
            # non-json serialisable fields in image.info
            if image.info:
                image_info = {
                    key: val
                    for key, val in image.info.items()
                    if isinstance(val, (str, int, float, type(None), bool, tuple, list))
                }
            else:
                image_info = {}

            session_update = SessionUpdateImageParse(
                image_key=image_key,
                image_size=image.size,
                image_mode=image.mode,
                image_palette_key=image_palette_key,
                image_info=image_info,
            )

            # Explicitly close the image to avoid crashes on Windows
            image.close()

        return session_update
parse_config: ImageParserResourceConfig instance-attribute
get(session=None)

Execute the strategy.

Source code in oteapi/strategies/parse/image.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def get(
    self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateImageParse:
    """Execute the strategy."""
    if not session:
        session = {}

    config = self.parse_config.configuration
    crop = config.crop if config.crop else session.get("imagecrop")

    mime_format = self.parse_config.mediaType.split("/")[1]
    image_format = SupportedFormat[mime_format].value

    # Proper download configurations
    conf = self.parse_config.dict()
    conf["configuration"] = config.download_config or {}
    download_config = ResourceConfig(**conf)

    downloader = create_strategy("download", download_config)
    session.update(downloader.initialize(session))

    downloader = create_strategy("download", download_config)
    output = downloader.get(session)
    session.update(output)

    if config.datacache_config and config.datacache_config.accessKey:
        cache_key = config.datacache_config.accessKey
    elif "key" in output:
        cache_key = output["key"]
    else:
        raise RuntimeError("No data cache key provided to the downloaded content")

    cache = DataCache(config.datacache_config)

    # Treat image according to filter values
    with cache.getfile(cache_key, suffix=mime_format) as filename:
        image = Image.open(filename, formats=[image_format])
        if crop:
            image = image.crop(crop)
        if config.image_mode:
            image = image.convert(mode=config.image_mode)

        if image_format == "GIF":
            if image.info.get("version", b"").startswith(b"GIF"):
                image.info.update(
                    {"version": image.info.get("version", b"")[len(b"GIF") :]}
                )

        # Use the buffer protocol to store the image in the datacache
        data = np.asarray(image)
        image_key = cache.add(
            data,
            key=config.image_key,
            tag=str(id(session)),
        )

        if image.mode == "P":
            image_palette_key = cache.add(
                np.asarray(image.getpalette()), tag=str(id(session))
            )
        else:
            image_palette_key = None

        # The session must be json serialisable - filter out all
        # non-json serialisable fields in image.info
        if image.info:
            image_info = {
                key: val
                for key, val in image.info.items()
                if isinstance(val, (str, int, float, type(None), bool, tuple, list))
            }
        else:
            image_info = {}

        session_update = SessionUpdateImageParse(
            image_key=image_key,
            image_size=image.size,
            image_mode=image.mode,
            image_palette_key=image_palette_key,
            image_info=image_info,
        )

        # Explicitly close the image to avoid crashes on Windows
        image.close()

    return session_update
initialize(session=None)

Initialize strategy.

Source code in oteapi/strategies/parse/image.py
145
146
147
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize strategy."""
    return SessionUpdate()

ImageParserConfig

Bases: AttrDict

Configuration data model for ImageDataParseStrategy.

Source code in oteapi/strategies/parse/image.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ImageParserConfig(AttrDict):
    """Configuration data model for
    [`ImageDataParseStrategy`][oteapi.strategies.parse.image.ImageDataParseStrategy]."""

    crop: Optional[Tuple[int, int, int, int]] = Field(
        None,
        description="Box cropping parameters (left, top, right, bottom).",
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configuration options for the local data cache.",
    )
    download_config: AttrDict = Field(
        AttrDict(),
        description="Configurations passed to the downloader.",
    )
    image_key: Optional[str] = Field(
        None,
        description="Key to use when storing the image data in datacache.",
    )
    image_mode: Optional[str] = Field(
        None,
        description=(
            "Pillow mode to convert image into. See "
            "https://pillow.readthedocs.io/en/stable/handbook/concepts.html "
            "for details."
        ),
    )
crop: Optional[Tuple[int, int, int, int]] = Field(None, description='Box cropping parameters (left, top, right, bottom).') class-attribute instance-attribute
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configuration options for the local data cache.') class-attribute instance-attribute
download_config: AttrDict = Field(AttrDict(), description='Configurations passed to the downloader.') class-attribute instance-attribute
image_key: Optional[str] = Field(None, description='Key to use when storing the image data in datacache.') class-attribute instance-attribute
image_mode: Optional[str] = Field(None, description='Pillow mode to convert image into. See https://pillow.readthedocs.io/en/stable/handbook/concepts.html for details.') class-attribute instance-attribute

ImageParserResourceConfig

Bases: ResourceConfig

Image parse strategy resource config.

Source code in oteapi/strategies/parse/image.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class ImageParserResourceConfig(ResourceConfig):
    """Image parse strategy resource config."""

    mediaType: Literal[
        "image/jpg",
        "image/jpeg",
        "image/jp2",
        "image/png",
        "image/gif",
        "image/tiff",
        "image/eps",
    ] = Field(
        ...,
        description=ResourceConfig.__fields__["mediaType"].field_info.description,
    )
    configuration: ImageParserConfig = Field(
        ImageParserConfig(),
        description="Image parse strategy-specific configuration.",
    )
configuration: ImageParserConfig = Field(ImageParserConfig(), description='Image parse strategy-specific configuration.') class-attribute instance-attribute
mediaType: Literal['image/jpg', 'image/jpeg', 'image/jp2', 'image/png', 'image/gif', 'image/tiff', 'image/eps'] = Field(..., description=ResourceConfig.__fields__['mediaType'].field_info.description) class-attribute instance-attribute

SessionUpdateImageParse

Bases: SessionUpdate

Configuration model for ImageParse.

See Pillow handbook for more details on image_mode, image_palette, and image_info.

Source code in oteapi/strategies/parse/image.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class SessionUpdateImageParse(SessionUpdate):
    """Configuration model for ImageParse.

    See [Pillow handbook](https://pillow.readthedocs.io/en/stable/handbook/concepts.html) for more details
    on `image_mode`, `image_palette`, and `image_info`.
    """

    image_key: str = Field(
        ...,
        description="Key with which the image content is stored in the data cache.",
    )
    image_size: Tuple[int, int] = Field(
        ...,
        description="Image size (width, height).",
    )
    image_mode: str = Field(
        ...,
        description="Image mode. Examples: 'L', 'P', 'RGB', 'RGBA'...",
    )
    image_palette_key: Optional[str] = Field(
        None,
        description="Datacache key for colour palette if mode is 'P'.",
    )
    image_info: dict = Field(
        {},
        description="Additional information about the image.",
    )
image_info: dict = Field({}, description='Additional information about the image.') class-attribute instance-attribute
image_key: str = Field(..., description='Key with which the image content is stored in the data cache.') class-attribute instance-attribute
image_mode: str = Field(..., description="Image mode. Examples: 'L', 'P', 'RGB', 'RGBA'...") class-attribute instance-attribute
image_palette_key: Optional[str] = Field(None, description="Datacache key for colour palette if mode is 'P'.") class-attribute instance-attribute
image_size: Tuple[int, int] = Field(..., description='Image size (width, height).') class-attribute instance-attribute

SupportedFormat

Bases: Enum

Supported formats for ImageDataParseStrategy.

Source code in oteapi/strategies/parse/image.py
76
77
78
79
80
81
82
83
84
85
class SupportedFormat(Enum):
    """Supported formats for `ImageDataParseStrategy`."""

    jpeg = "JPEG"
    jpg = "JPEG"
    jp2 = "JPEG2000"
    png = "PNG"
    gif = "GIF"
    tiff = "TIFF"
    eps = "EPS"
eps = 'EPS' class-attribute instance-attribute
gif = 'GIF' class-attribute instance-attribute
jp2 = 'JPEG2000' class-attribute instance-attribute
jpeg = 'JPEG' class-attribute instance-attribute
jpg = 'JPEG' class-attribute instance-attribute
png = 'PNG' class-attribute instance-attribute
tiff = 'TIFF' class-attribute instance-attribute

postgres

Strategy class for application/vnd.postgresql

PostgresConfig

Bases: AttrDict

Configuration data model for PostgresResourceStrategy.

Source code in oteapi/strategies/parse/postgres.py
13
14
15
16
17
18
19
20
21
22
class PostgresConfig(AttrDict):
    """Configuration data model for
    [`PostgresResourceStrategy`][oteapi.strategies.parse.postgres.PostgresResourceConfig].
    """

    user: Optional[str] = Field(None, description="postgres server username")
    dbname: Optional[str] = Field(None, description="postgres dbname name")
    password: Optional[str] = Field(None, description="postgres password")

    sqlquery: Optional[str] = Field("", description="A SQL query string.")
dbname: Optional[str] = Field(None, description='postgres dbname name') class-attribute instance-attribute
password: Optional[str] = Field(None, description='postgres password') class-attribute instance-attribute
sqlquery: Optional[str] = Field('', description='A SQL query string.') class-attribute instance-attribute
user: Optional[str] = Field(None, description='postgres server username') class-attribute instance-attribute

PostgresResourceConfig

Bases: ResourceConfig

Postgresql parse strategy config

Source code in oteapi/strategies/parse/postgres.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class PostgresResourceConfig(ResourceConfig):
    """Postgresql parse strategy config"""

    configuration: PostgresConfig = Field(
        PostgresConfig(),
        description=(
            "Configuration for resource. " "Values in the accessURL take precedence."
        ),
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configuration options for the local data cache.",
    )

    @classmethod
    def _urlconstruct(
        cls,  # PEP8 - Always use cls for the first argument to class methods.
        scheme: Optional[str] = "",  # Schema defining link format
        user: Optional[str] = None,  # Username
        password: Optional[str] = None,  # Password
        host: Optional[str] = None,
        port: Optional[int] = None,
        path: Optional[str] = "",
        params: Optional[str] = "",
        query: Optional[str] = "",
        fragment: Optional[str] = "",
    ):
        """Construct a pydantic AnyUrl based on the given URL properties"""

        # Hostname should always be given
        if not host:
            raise ValueError("hostname must be specified")

        # Update netloc of username or username|password pair is defined
        netloc = host
        if user and not password:  # Only username is provided. OK
            netloc = f"{user}@{host}"
        elif user and password:  # Username and password is provided. OK
            netloc = f"{user}:{password}@{host}"
        else:  # Password and no username is provided. ERROR
            raise ValueError("username not provided")

        # Append port if port is defined
        netloc = netloc if not port else f"{netloc}:{port}"

        # Construct a URL from a tuple of URL-properties
        unparsed = urlunparse([scheme, netloc, path, params, query, fragment])

        # Populate and return a Pydantic URL
        return parse_obj_as(AnyUrl, unparsed)

    @root_validator
    def adjust_url(cls, values):
        """Root Validator
        Verifies configuration consistency, merge configurations
        and update the accessUrl property.
        """

        # Copy model-state into placeholders
        config = values.get("configuration")
        accessUrl = values["accessUrl"]

        # Check and merge user configuration
        user = accessUrl.user if accessUrl.user else config["user"]
        if config["user"] and user != config["user"]:
            raise ValueError("mismatching username in accessUrl and configuration")

        # Check and merge password configuration
        password = accessUrl.password if accessUrl.password else config["password"]
        if config["password"] and password != config["password"]:
            raise ValueError("mismatching password in accessUrl and configuration")

        # Check and merge database name configuration
        dbname = accessUrl.path if accessUrl.path else config["dbname"]
        if config["dbname"] and dbname != config["dbname"]:
            raise ValueError("mismatching dbname in accessUrl and configuration")

        # Reconstruct accessUrl from the updated properties
        values["accessUrl"] = cls._urlconstruct(
            scheme=accessUrl.scheme,
            host=accessUrl.host,
            port=accessUrl.port,
            user=user,
            password=password,
        )
        return values
configuration: PostgresConfig = Field(PostgresConfig(), description='Configuration for resource. Values in the accessURL take precedence.') class-attribute instance-attribute
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configuration options for the local data cache.') class-attribute instance-attribute
adjust_url(values)

Root Validator Verifies configuration consistency, merge configurations and update the accessUrl property.

Source code in oteapi/strategies/parse/postgres.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@root_validator
def adjust_url(cls, values):
    """Root Validator
    Verifies configuration consistency, merge configurations
    and update the accessUrl property.
    """

    # Copy model-state into placeholders
    config = values.get("configuration")
    accessUrl = values["accessUrl"]

    # Check and merge user configuration
    user = accessUrl.user if accessUrl.user else config["user"]
    if config["user"] and user != config["user"]:
        raise ValueError("mismatching username in accessUrl and configuration")

    # Check and merge password configuration
    password = accessUrl.password if accessUrl.password else config["password"]
    if config["password"] and password != config["password"]:
        raise ValueError("mismatching password in accessUrl and configuration")

    # Check and merge database name configuration
    dbname = accessUrl.path if accessUrl.path else config["dbname"]
    if config["dbname"] and dbname != config["dbname"]:
        raise ValueError("mismatching dbname in accessUrl and configuration")

    # Reconstruct accessUrl from the updated properties
    values["accessUrl"] = cls._urlconstruct(
        scheme=accessUrl.scheme,
        host=accessUrl.host,
        port=accessUrl.port,
        user=user,
        password=password,
    )
    return values

PostgresResourceStrategy

Resource strategy for Postgres.

Registers strategies:

  • ("accessService", "postgres")

Purpose of this strategy: Connect to a postgres DB and run a SQL query on the dbname to return all relevant rows.

Source code in oteapi/strategies/parse/postgres.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@dataclass
class PostgresResourceStrategy:
    """Resource strategy for Postgres.

    **Registers strategies**:

    - `("accessService", "postgres")`

    Purpose of this strategy: Connect to a postgres DB and run a
    SQL query on the dbname to return all relevant rows.

    """

    resource_config: PostgresResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize strategy."""
        return SessionUpdate()

    def get(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> SessionUpdatePostgresResource:
        """Resource Postgres query responses."""
        if session:
            self._use_filters(session)
        session = session if session else {}

        connection = create_connection(self.resource_config)
        cursor = connection.cursor()
        result = cursor.execute(self.resource_config.configuration.sqlquery).fetchall()
        connection.close()
        return SessionUpdatePostgresResource(result=result)

    def _use_filters(self, session: "Dict[str, Any]") -> None:
        """Update `config` according to filter values found in the session."""
        if "sqlquery" in session and not self.resource_config.configuration.sqlquery:
            # Use SQL query available in session
            self.resource_config.configuration.sqlquery = session["sqlquery"]
resource_config: PostgresResourceConfig instance-attribute
get(session=None)

Resource Postgres query responses.

Source code in oteapi/strategies/parse/postgres.py
158
159
160
161
162
163
164
165
166
167
168
169
170
def get(
    self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdatePostgresResource:
    """Resource Postgres query responses."""
    if session:
        self._use_filters(session)
    session = session if session else {}

    connection = create_connection(self.resource_config)
    cursor = connection.cursor()
    result = cursor.execute(self.resource_config.configuration.sqlquery).fetchall()
    connection.close()
    return SessionUpdatePostgresResource(result=result)
initialize(session=None)

Initialize strategy.

Source code in oteapi/strategies/parse/postgres.py
154
155
156
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize strategy."""
    return SessionUpdate()

SessionUpdatePostgresResource

Bases: SessionUpdate

Configuration model for PostgresResource.

Source code in oteapi/strategies/parse/postgres.py
133
134
135
136
class SessionUpdatePostgresResource(SessionUpdate):
    """Configuration model for PostgresResource."""

    result: list = Field(..., description="List of results from the query.")
result: list = Field(..., description='List of results from the query.') class-attribute instance-attribute

create_connection(resource_config)

Create a dbname connection to Postgres dbname.

Parameters:

Name Type Description Default
resource_config PostgresResourceConfig

A dictionary providing everything needed for a psycopg connection configuration

required

Raises:

Type Description
Error

If a DB connection cannot be made.

Returns:

Type Description
Connection

Connection object.

Source code in oteapi/strategies/parse/postgres.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def create_connection(resource_config: PostgresResourceConfig) -> psycopg.Connection:
    """Create a dbname connection to Postgres dbname.

    Parameters:
        resource_config: A dictionary providing everything needed for a psycopg
                     connection configuration

    Raises:
        psycopg.Error: If a DB connection cannot be made.

    Returns:
        Connection object.

    """
    try:
        return psycopg.connect(resource_config.accessUrl)
    except psycopg.Error as exc:
        raise psycopg.Error("Could not connect to given Postgres DB.") from exc

text_csv

Strategy class for text/csv.

CSVDialect: Type[Enum] = Enum(value='CSVDialect', names={dialect.upper(): dialectfor dialect in csv.list_dialects()}, module=__name__, type=str) module-attribute

CSV dialects.

All available dialects are retrieved through the csv.list_dialects() function, and will thus depend on the currently loaded and used Python interpreter.

CSVConfig

Bases: AttrDict

CSV parse-specific Configuration Data Model.

Source code in oteapi/strategies/parse/text_csv.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class CSVConfig(AttrDict):
    """CSV parse-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for storing the downloaded file content.",
    )
    dialect: DialectFormatting = Field(
        DialectFormatting(),
        description=(
            "Dialect and formatting parameters. See [the Python docs]"
            "(https://docs.python.org/3/library/csv.html#csv-fmt-params) for more "
            "information."
        ),
    )
    reader: ReaderConfig = Field(
        ReaderConfig(),
        description=(
            "CSV DictReader configuration parameters. See [the Python docs]"
            "(https://docs.python.org/3/library/csv.html#csv.DictReader) for more "
            "information."
        ),
    )
datacache_config: Optional[DataCacheConfig] = Field(None, description='Configurations for the data cache for storing the downloaded file content.') class-attribute instance-attribute
dialect: DialectFormatting = Field(DialectFormatting(), description='Dialect and formatting parameters. See [the Python docs](https://docs.python.org/3/library/csv.html#csv-fmt-params) for more information.') class-attribute instance-attribute
reader: ReaderConfig = Field(ReaderConfig(), description='CSV DictReader configuration parameters. See [the Python docs](https://docs.python.org/3/library/csv.html#csv.DictReader) for more information.') class-attribute instance-attribute

CSVParseStrategy

Parse strategy for CSV files.

Registers strategies:

  • ("mediaType", "text/csv")
Source code in oteapi/strategies/parse/text_csv.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
@dataclass
class CSVParseStrategy:
    """Parse strategy for CSV files.

    **Registers strategies**:

    - `("mediaType", "text/csv")`

    """

    parse_config: CSVResourceConfig

    def initialize(self, session: "Optional[dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[dict[str, Any]]" = None) -> SessionUpdateCSVParse:
        """Parse CSV."""
        downloader = create_strategy("download", self.parse_config)
        output = downloader.get()
        cache = DataCache(self.parse_config.configuration.datacache_config)

        with cache.getfile(output["key"]) as csvfile_path:
            kwargs = self.parse_config.configuration.dialect.dict(
                exclude={"base", "quoting"}, exclude_unset=True
            )

            dialect = self.parse_config.configuration.dialect.base
            if dialect:
                kwargs["dialect"] = dialect.value
            quoting = self.parse_config.configuration.dialect.quoting
            if quoting:
                kwargs["quoting"] = quoting.csv_constant()

            kwargs.update(
                self.parse_config.configuration.reader.dict(exclude_unset=True)
            )

            with open(
                csvfile_path,
                newline="",
                encoding=self.parse_config.configuration.reader.encoding,
            ) as csvfile:
                csvreader = csv.DictReader(csvfile, **kwargs)
                content: dict[Union[str, None], list[Any]] = defaultdict(list)
                for row in csvreader:
                    for field, value in row.items():
                        if (
                            csvreader.reader.dialect.quoting == csv.QUOTE_NONNUMERIC
                            and isinstance(value, float)
                            and value.is_integer()
                        ):
                            value = int(value)
                        content[field].append(value)
            for key in list(content):
                if any(isinstance(value, float) for value in content[key]):
                    content[key] = [
                        float(value)
                        if (value or value == 0.0 or value == 0)
                        and value != csvreader.restval
                        else float("nan")
                        for value in content[key]
                    ]
                    continue
                if any(isinstance(value, int) for value in content[key]):
                    content[key] = [
                        int(value)
                        if (value or value == 0) and value != csvreader.restval
                        else csvreader.restval
                        for value in content[key]
                    ]

            return SessionUpdateCSVParse(content=content)
parse_config: CSVResourceConfig instance-attribute
get(session=None)

Parse CSV.

Source code in oteapi/strategies/parse/text_csv.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def get(self, session: "Optional[dict[str, Any]]" = None) -> SessionUpdateCSVParse:
    """Parse CSV."""
    downloader = create_strategy("download", self.parse_config)
    output = downloader.get()
    cache = DataCache(self.parse_config.configuration.datacache_config)

    with cache.getfile(output["key"]) as csvfile_path:
        kwargs = self.parse_config.configuration.dialect.dict(
            exclude={"base", "quoting"}, exclude_unset=True
        )

        dialect = self.parse_config.configuration.dialect.base
        if dialect:
            kwargs["dialect"] = dialect.value
        quoting = self.parse_config.configuration.dialect.quoting
        if quoting:
            kwargs["quoting"] = quoting.csv_constant()

        kwargs.update(
            self.parse_config.configuration.reader.dict(exclude_unset=True)
        )

        with open(
            csvfile_path,
            newline="",
            encoding=self.parse_config.configuration.reader.encoding,
        ) as csvfile:
            csvreader = csv.DictReader(csvfile, **kwargs)
            content: dict[Union[str, None], list[Any]] = defaultdict(list)
            for row in csvreader:
                for field, value in row.items():
                    if (
                        csvreader.reader.dialect.quoting == csv.QUOTE_NONNUMERIC
                        and isinstance(value, float)
                        and value.is_integer()
                    ):
                        value = int(value)
                    content[field].append(value)
        for key in list(content):
            if any(isinstance(value, float) for value in content[key]):
                content[key] = [
                    float(value)
                    if (value or value == 0.0 or value == 0)
                    and value != csvreader.restval
                    else float("nan")
                    for value in content[key]
                ]
                continue
            if any(isinstance(value, int) for value in content[key]):
                content[key] = [
                    int(value)
                    if (value or value == 0) and value != csvreader.restval
                    else csvreader.restval
                    for value in content[key]
                ]

        return SessionUpdateCSVParse(content=content)
initialize(session=None)

Initialize.

Source code in oteapi/strategies/parse/text_csv.py
264
265
266
def initialize(self, session: "Optional[dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

CSVResourceConfig

Bases: ResourceConfig

CSV parse strategy filter config.

Source code in oteapi/strategies/parse/text_csv.py
231
232
233
234
235
236
237
238
239
240
241
class CSVResourceConfig(ResourceConfig):
    """CSV parse strategy filter config."""

    mediaType: str = Field(
        "text/csv",
        const=True,
        description=ResourceConfig.__fields__["mediaType"].field_info.description,
    )
    configuration: CSVConfig = Field(
        CSVConfig(), description="CSV parse strategy-specific configuration."
    )
configuration: CSVConfig = Field(CSVConfig(), description='CSV parse strategy-specific configuration.') class-attribute instance-attribute
mediaType: str = Field('text/csv', const=True, description=ResourceConfig.__fields__['mediaType'].field_info.description) class-attribute instance-attribute

DialectFormatting

Bases: BaseModel

Dialect and formatting parameters for CSV.

See the Python docs for more information.

Note

As Dialect.lineterminator is hardcoded in csv.reader, it is left out of this model.

Source code in oteapi/strategies/parse/text_csv.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class DialectFormatting(BaseModel):
    """Dialect and formatting parameters for CSV.

    See [the Python docs](https://docs.python.org/3/library/csv.html#csv-fmt-params)
    for more information.

    Note:
        As `Dialect.lineterminator` is hardcoded in `csv.reader`, it is left out of
        this model.

    """

    base: Optional[CSVDialect] = Field(
        None,
        description=(
            "A specific CSV dialect, e.g., 'excel'. Any other parameters here will "
            "overwrite the preset dialect parameters for the specified dialect."
        ),
    )
    delimiter: Optional[str] = Field(
        None,
        description=(
            "A one-character string used to separate fields. "
            "See [the Python docs entry](https://docs.python.org/3/library/csv.html"
            "#csv.Dialect.delimiter) for more information."
        ),
        min_length=1,
        max_length=1,
    )
    doublequote: Optional[bool] = Field(
        None,
        description=(
            "Controls how instances of [`quotechar`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.quotechar] "
            "appearing inside a field should themselves be quoted. When `True`, the "
            "character is doubled. When `False`, the [`escapechar`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.escapechar] "
            "is used as a prefix to the [`quotechar`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.quotechar]. "
            "See [the Python docs entry]"
            "(https://docs.python.org/3/library/csv.html#csv.Dialect.doublequote) "
            "for more information."
        ),
    )
    escapechar: Optional[str] = Field(
        None,
        description=(
            "A one-character string used by the writer to escape the [`delimiter`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] if "
            "[`quoting`][oteapi.strategies.parse.text_csv.DialectFormatting.quoting] "
            "is set to [`QUOTE_NONE`]"
            "[oteapi.strategies.parse.text_csv.QuoteConstants.QUOTE_NONE] and the "
            "[`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting."
            "quotechar] if [`doublequote`][oteapi.strategies.parse.text_csv."
            "DialectFormatting.doublequote] is `False`. On reading, the "
            "[`escapechar`][oteapi.strategies.parse.text_csv.DialectFormatting."
            "escapechar] removes any special meaning from the following character. "
            "See [the Python docs entry]"
            "(https://docs.python.org/3/library/csv.html#csv.Dialect.escapechar) "
            "for more information."
        ),
        min_length=1,
        max_length=1,
    )
    quotechar: Optional[str] = Field(
        None,
        description=(
            "A one-character string used to quote fields containing special "
            "characters, such as the [`delimiter`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] or "
            "[`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting."
            "quotechar], or which contain new-line characters. See "
            "[the Python docs entry](https://docs.python.org/3/library/csv.html"
            "#csv.Dialect.quotechar) for more information."
        ),
        min_length=1,
        max_length=1,
    )
    quoting: Optional[QuoteConstants] = Field(
        None,
        description=(
            "Controls when quotes should be generated by the writer and recognised by "
            "the reader. It can take on any of the `QUOTE_*` constants (see section "
            "[Module Contents](https://docs.python.org/3/library/csv.html"
            "#csv-contents)). See [the Python docs entry]"
            "(https://docs.python.org/3/library/csv.html#csv.Dialect.quoting) "
            "for more information."
        ),
    )
    skipinitialspace: Optional[bool] = Field(
        None,
        description=(
            "When `True`, whitespace immediately following the [`delimiter`]"
            "[oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] is "
            "ignored. See [the Python docs entry]"
            "(https://docs.python.org/3/library/csv.html#csv.Dialect.skipinitialspace)"
            " for more information."
        ),
    )
    strict: Optional[bool] = Field(
        None,
        description=(
            "When `True`, raise exception [Error]"
            "(https://docs.python.org/3/library/csv.html#csv.Error) on bad CSV input. "
            "See [the Python docs entry](https://docs.python.org/3/library/csv.html"
            "#csv.Dialect.strict) for more information."
        ),
    )

    @validator("base")
    def validate_dialect_base(cls, value: str) -> str:
        """Ensure the given `base` dialect is registered locally."""
        if value not in csv.list_dialects():
            raise ValueError(
                f"{value!r} is not a known registered CSV dialect. "
                f"Registered dialects: {', '.join(csv.list_dialects())}."
            )
        return value
base: Optional[CSVDialect] = Field(None, description="A specific CSV dialect, e.g., 'excel'. Any other parameters here will overwrite the preset dialect parameters for the specified dialect.") class-attribute instance-attribute
delimiter: Optional[str] = Field(None, description='A one-character string used to separate fields. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.delimiter) for more information.', min_length=1, max_length=1) class-attribute instance-attribute
doublequote: Optional[bool] = Field(None, description='Controls how instances of [`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting.quotechar] appearing inside a field should themselves be quoted. When `True`, the character is doubled. When `False`, the [`escapechar`][oteapi.strategies.parse.text_csv.DialectFormatting.escapechar] is used as a prefix to the [`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting.quotechar]. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.doublequote) for more information.') class-attribute instance-attribute
escapechar: Optional[str] = Field(None, description='A one-character string used by the writer to escape the [`delimiter`][oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] if [`quoting`][oteapi.strategies.parse.text_csv.DialectFormatting.quoting] is set to [`QUOTE_NONE`][oteapi.strategies.parse.text_csv.QuoteConstants.QUOTE_NONE] and the [`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting.quotechar] if [`doublequote`][oteapi.strategies.parse.text_csv.DialectFormatting.doublequote] is `False`. On reading, the [`escapechar`][oteapi.strategies.parse.text_csv.DialectFormatting.escapechar] removes any special meaning from the following character. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.escapechar) for more information.', min_length=1, max_length=1) class-attribute instance-attribute
quotechar: Optional[str] = Field(None, description='A one-character string used to quote fields containing special characters, such as the [`delimiter`][oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] or [`quotechar`][oteapi.strategies.parse.text_csv.DialectFormatting.quotechar], or which contain new-line characters. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.quotechar) for more information.', min_length=1, max_length=1) class-attribute instance-attribute
quoting: Optional[QuoteConstants] = Field(None, description='Controls when quotes should be generated by the writer and recognised by the reader. It can take on any of the `QUOTE_*` constants (see section [Module Contents](https://docs.python.org/3/library/csv.html#csv-contents)). See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.quoting) for more information.') class-attribute instance-attribute
skipinitialspace: Optional[bool] = Field(None, description='When `True`, whitespace immediately following the [`delimiter`][oteapi.strategies.parse.text_csv.DialectFormatting.delimiter] is ignored. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.skipinitialspace) for more information.') class-attribute instance-attribute
strict: Optional[bool] = Field(None, description='When `True`, raise exception [Error](https://docs.python.org/3/library/csv.html#csv.Error) on bad CSV input. See [the Python docs entry](https://docs.python.org/3/library/csv.html#csv.Dialect.strict) for more information.') class-attribute instance-attribute
validate_dialect_base(value)

Ensure the given base dialect is registered locally.

Source code in oteapi/strategies/parse/text_csv.py
157
158
159
160
161
162
163
164
165
@validator("base")
def validate_dialect_base(cls, value: str) -> str:
    """Ensure the given `base` dialect is registered locally."""
    if value not in csv.list_dialects():
        raise ValueError(
            f"{value!r} is not a known registered CSV dialect. "
            f"Registered dialects: {', '.join(csv.list_dialects())}."
        )
    return value

QuoteConstants

Bases: str, Enum

CSV module QUOTE_* constants.

Source code in oteapi/strategies/parse/text_csv.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class QuoteConstants(str, Enum):
    """CSV module `QUOTE_*` constants."""

    QUOTE_ALL = "QUOTE_ALL"
    QUOTE_MINIMAL = "QUOTE_MINIMAL"
    QUOTE_NONUMERIC = "QUOTE_NONNUMERIC"
    QUOTE_NONE = "QUOTE_NONE"

    def csv_constant(self) -> int:
        """Return the CSV lib equivalent constant."""
        return {
            self.QUOTE_ALL: csv.QUOTE_ALL,
            self.QUOTE_MINIMAL: csv.QUOTE_MINIMAL,
            self.QUOTE_NONUMERIC: csv.QUOTE_NONNUMERIC,
            self.QUOTE_NONE: csv.QUOTE_NONE,
        }[self]
QUOTE_ALL = 'QUOTE_ALL' class-attribute instance-attribute
QUOTE_MINIMAL = 'QUOTE_MINIMAL' class-attribute instance-attribute
QUOTE_NONE = 'QUOTE_NONE' class-attribute instance-attribute
QUOTE_NONUMERIC = 'QUOTE_NONNUMERIC' class-attribute instance-attribute
csv_constant()

Return the CSV lib equivalent constant.

Source code in oteapi/strategies/parse/text_csv.py
24
25
26
27
28
29
30
31
def csv_constant(self) -> int:
    """Return the CSV lib equivalent constant."""
    return {
        self.QUOTE_ALL: csv.QUOTE_ALL,
        self.QUOTE_MINIMAL: csv.QUOTE_MINIMAL,
        self.QUOTE_NONUMERIC: csv.QUOTE_NONNUMERIC,
        self.QUOTE_NONE: csv.QUOTE_NONE,
    }[self]

ReaderConfig

Bases: BaseModel

CSV DictReader configuration parameters.

See the Python docs for more information.

Source code in oteapi/strategies/parse/text_csv.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class ReaderConfig(BaseModel):
    """CSV DictReader configuration parameters.

    See [the Python docs](https://docs.python.org/3/library/csv.html#csv.DictReader)
    for more information.
    """

    fieldnames: Optional[list[str]] = Field(
        None,
        description=(
            "List of headers. If not set, the values in the first row of the CSV file "
            "will be used as the field names."
        ),
    )
    restkey: Optional[Hashable] = Field(
        None,
        description=(
            "If a row has more fields than [`fieldnames`]"
            "[oteapi.strategies.parse.text_csv.ReaderConfig.fieldnames], the "
            "remaining data is put in a list and stored with the field name specified "
            "by [`restkey`][oteapi.strategies.parse.text_csv.ReaderConfig.restkey]."
        ),
    )
    restval: Optional[Any] = Field(
        None,
        description=(
            "If a non-blank row has fewer fields than the length of [`fieldnames`]"
            "[oteapi.strategies.parse.text_csv.ReaderConfig.fieldnames], the missing "
            "values are filled-in with the value of [`restval`]"
            "[oteapi.strategies.parse.text_csv.ReaderConfig.restval]."
        ),
    )
    encoding: str = Field(
        "utf8",
        description="The file encoding.",
    )
encoding: str = Field('utf8', description='The file encoding.') class-attribute instance-attribute
fieldnames: Optional[list[str]] = Field(None, description='List of headers. If not set, the values in the first row of the CSV file will be used as the field names.') class-attribute instance-attribute
restkey: Optional[Hashable] = Field(None, description='If a row has more fields than [`fieldnames`][oteapi.strategies.parse.text_csv.ReaderConfig.fieldnames], the remaining data is put in a list and stored with the field name specified by [`restkey`][oteapi.strategies.parse.text_csv.ReaderConfig.restkey].') class-attribute instance-attribute
restval: Optional[Any] = Field(None, description='If a non-blank row has fewer fields than the length of [`fieldnames`][oteapi.strategies.parse.text_csv.ReaderConfig.fieldnames], the missing values are filled-in with the value of [`restval`][oteapi.strategies.parse.text_csv.ReaderConfig.restval].') class-attribute instance-attribute

SessionUpdateCSVParse

Bases: SessionUpdate

Class for returning values from CSV Parse.

Source code in oteapi/strategies/parse/text_csv.py
244
245
246
247
248
249
class SessionUpdateCSVParse(SessionUpdate):
    """Class for returning values from CSV Parse."""

    content: dict[Union[str, None], list[Any]] = Field(
        ..., description="Content of the CSV document."
    )
content: dict[Union[str, None], list[Any]] = Field(..., description='Content of the CSV document.') class-attribute instance-attribute

resource

transformation

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CELERY_APP = Celery(broker=f'redis://{REDIS_HOST}:{REDIS_PORT}', backend=f'redis://{REDIS_HOST}:{REDIS_PORT}') module-attribute

REDIS_HOST = os.getenv('OTEAPI_REDIS_HOST', 'redis') module-attribute

REDIS_PORT = int(os.getenv('OTEAPI_REDIS_PORT', '6379')) module-attribute

CeleryConfig

Bases: AttrDict

Celery configuration.

All fields here (including those added from the session through the get() method, as well as those added "anonymously") will be used as keyword arguments to the send_task() method for the Celery App.

Note

Using alias for the name field to favor populating it with task_name arguments, since this is the "original" field name. I.e., this is done for backwards compatibility.

Setting allow_population_by_field_name=True as pydantic model configuration in order to allow populating it using name as well as task_name.

Source code in oteapi/strategies/transformation/celery_remote.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class CeleryConfig(AttrDict, allow_population_by_field_name=True):
    """Celery configuration.

    All fields here (including those added from the session through the `get()` method,
    as well as those added "anonymously") will be used as keyword arguments to the
    `send_task()` method for the Celery App.

    Note:
        Using `alias` for the `name` field to favor populating it with `task_name`
        arguments, since this is the "original" field name. I.e., this is done for
        backwards compatibility.

    Setting `allow_population_by_field_name=True` as pydantic model configuration in
    order to allow populating it using `name` as well as `task_name`.

    """

    name: str = Field(..., description="A task name.", alias="task_name")
    args: list = Field(..., description="List of arguments for the task.")
args: list = Field(..., description='List of arguments for the task.') class-attribute instance-attribute
name: str = Field(..., description='A task name.', alias='task_name') class-attribute instance-attribute

CeleryRemoteStrategy

Submit job to remote Celery runner.

Registers strategies:

  • ("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@dataclass
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformationType", "celery/remote")`

    """

    transformation_config: CeleryStrategyConfig

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery:
        """Run a job, return a job ID."""
        if session:
            self._use_session(session)

        result: "Union[AsyncResult, Any]" = CELERY_APP.send_task(
            **self.transformation_config.configuration
        )
        return SessionUpdateCelery(celery_task_id=result.task_id)

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        # pylint: disable=unused-argument
        """Initialize a job."""
        return SessionUpdate()

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=CELERY_APP)
        return TransformationStatus(id=task_id, status=result.state)

    def _use_session(self, session: "Dict[str, Any]") -> None:
        """Update the configuration with values from the sesssion.

        Check all fields (non-aliased and aliased) in `CeleryConfig` if they exist in
        the session. Override the given field values in the current strategy-specific
        configuration (the `CeleryConfig` instance) with the values found in the
        session.

        Parameters:
            session: The current OTE session.

        """
        alias_mapping: dict[str, str] = {
            field.alias: field_name
            for field_name, field in CeleryConfig.__fields__.items()
        }

        fields = set(CeleryConfig.__fields__)
        fields |= {_.alias for _ in CeleryConfig.__fields__.values()}

        for field in fields:
            if field in session:
                setattr(
                    self.transformation_config.configuration,
                    alias_mapping[field],
                    session[field],
                )
transformation_config: CeleryStrategyConfig instance-attribute
get(session=None)

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
85
86
87
88
89
90
91
92
93
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery:
    """Run a job, return a job ID."""
    if session:
        self._use_session(session)

    result: "Union[AsyncResult, Any]" = CELERY_APP.send_task(
        **self.transformation_config.configuration
    )
    return SessionUpdateCelery(celery_task_id=result.task_id)
initialize(session=None)

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
95
96
97
98
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    # pylint: disable=unused-argument
    """Initialize a job."""
    return SessionUpdate()
status(task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
100
101
102
103
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=CELERY_APP)
    return TransformationStatus(id=task_id, status=result.state)

CeleryStrategyConfig

Bases: TransformationConfig

Celery strategy-specific configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
58
59
60
61
62
63
64
65
66
67
68
69
70
class CeleryStrategyConfig(TransformationConfig):
    """Celery strategy-specific configuration."""

    transformationType: str = Field(
        "celery/remote",
        const=True,
        description=TransformationConfig.__fields__[
            "transformationType"
        ].field_info.description,
    )
    configuration: CeleryConfig = Field(
        ..., description="Celery transformation strategy-specific configuration."
    )
configuration: CeleryConfig = Field(..., description='Celery transformation strategy-specific configuration.') class-attribute instance-attribute
transformationType: str = Field('celery/remote', const=True, description=TransformationConfig.__fields__['transformationType'].field_info.description) class-attribute instance-attribute

SessionUpdateCelery

Bases: SessionUpdate

Class for returning values from a Celery task.

Source code in oteapi/strategies/transformation/celery_remote.py
52
53
54
55
class SessionUpdateCelery(SessionUpdate):
    """Class for returning values from a Celery task."""

    celery_task_id: str = Field(..., description="A Celery task identifier.")
celery_task_id: str = Field(..., description='A Celery task identifier.') class-attribute instance-attribute