Skip to content

OTE-API Core Strategies

This page provides documentation for the oteapi.strategies submodule, where all the core OTE-API strategies are located.

These strategies will always be available when setting up a server based on the OTE-API Core package.

download special

file

Download strategy class for the file scheme.

FileConfig (BaseModel) pydantic-model

File-specific Configuration Data Model.

Source code in oteapi/strategies/download/file.py
class FileConfig(BaseModel):
    """File-specific Configuration Data Model."""

    text: bool = Field(
        False,
        description=(
            "Whether the file should be opened in text mode. If `False`, the file will"
            " be opened in bytes mode."
        ),
    )
    encoding: Optional[str] = Field(
        None,
        description=(
            "Encoding used when opening the file. The default is platform dependent."
        ),
    )
encoding: str pydantic-field

Encoding used when opening the file. The default is platform dependent.

text: bool pydantic-field

Whether the file should be opened in text mode. If False, the file will be opened in bytes mode.

FileStrategy dataclass

Strategy for retrieving data from a local file.

Registers strategies:

  • ("scheme", "file")
Source code in oteapi/strategies/download/file.py
@dataclass
class FileStrategy:
    """Strategy for retrieving data from a local file.

    **Registers strategies**:

    - `("scheme", "file")`

    """

    download_config: "ResourceConfig"

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateFile:
        """Read local file."""
        if (
            self.download_config.downloadUrl is None
            or self.download_config.downloadUrl.scheme != "file"
        ):
            raise ValueError(
                "Expected 'downloadUrl' to have scheme 'file' in the configuration."
            )

        filename = Path(self.download_config.downloadUrl.path).resolve()
        if isinstance(filename, PosixPath):
            filename = Path("/" + self.download_config.downloadUrl.host + str(filename))

        cache = DataCache(self.download_config.configuration)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            config = FileConfig(**self.download_config.configuration)
            key = cache.add(
                filename.read_text(encoding=config.encoding)
                if config.text
                else filename.read_bytes()
            )

        return SessionUpdateFile(key=key)
get(self, session=None)

Read local file.

Source code in oteapi/strategies/download/file.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateFile:
    """Read local file."""
    if (
        self.download_config.downloadUrl is None
        or self.download_config.downloadUrl.scheme != "file"
    ):
        raise ValueError(
            "Expected 'downloadUrl' to have scheme 'file' in the configuration."
        )

    filename = Path(self.download_config.downloadUrl.path).resolve()
    if isinstance(filename, PosixPath):
        filename = Path("/" + self.download_config.downloadUrl.host + str(filename))

    cache = DataCache(self.download_config.configuration)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        config = FileConfig(**self.download_config.configuration)
        key = cache.add(
            filename.read_text(encoding=config.encoding)
            if config.text
            else filename.read_bytes()
        )

    return SessionUpdateFile(key=key)
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/download/file.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateFile (SessionUpdate) pydantic-model

Class for returning values from Download File strategy.

Source code in oteapi/strategies/download/file.py
class SessionUpdateFile(SessionUpdate):
    """Class for returning values from Download File strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str pydantic-field required

Key to access the data in the cache.

https

Download strategy class for http/https

HTTPSStrategy dataclass

Strategy for retrieving data via http.

Registers strategies:

  • ("scheme", "http")
  • ("scheme", "https")
Source code in oteapi/strategies/download/https.py
@dataclass
class HTTPSStrategy:
    """Strategy for retrieving data via http.

    **Registers strategies**:

    - `("scheme", "http")`
    - `("scheme", "https")`

    """

    download_config: "ResourceConfig"

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateHTTPS:
        """Download via http/https and store on local cache."""
        cache = DataCache(self.download_config.configuration)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            if not self.download_config.downloadUrl:
                raise ValueError("downloadUrl not defined in configuration.")
            req = requests.get(self.download_config.downloadUrl, allow_redirects=True)
            key = cache.add(req.content)

        return SessionUpdateHTTPS(key=key)
get(self, session=None)

Download via http/https and store on local cache.

Source code in oteapi/strategies/download/https.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateHTTPS:
    """Download via http/https and store on local cache."""
    cache = DataCache(self.download_config.configuration)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        if not self.download_config.downloadUrl:
            raise ValueError("downloadUrl not defined in configuration.")
        req = requests.get(self.download_config.downloadUrl, allow_redirects=True)
        key = cache.add(req.content)

    return SessionUpdateHTTPS(key=key)
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/download/https.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateHTTPS (SessionUpdate) pydantic-model

Class for returning values from Download HTTPS strategy.

Source code in oteapi/strategies/download/https.py
class SessionUpdateHTTPS(SessionUpdate):
    """Class for returning values from Download HTTPS strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str pydantic-field required

Key to access the data in the cache.

sftp

Strategy class for sftp/ftp

SFTPStrategy dataclass

Strategy for retrieving data via sftp.

Registers strategies:

  • ("scheme", "ftp")
  • ("scheme", "sftp")
Source code in oteapi/strategies/download/sftp.py
@dataclass
class SFTPStrategy:
    """Strategy for retrieving data via sftp.

    **Registers strategies**:

    - `("scheme", "ftp")`
    - `("scheme", "sftp")`

    """

    download_config: "ResourceConfig"

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateSFTP:
        """Download via sftp"""
        cache = DataCache(self.download_config.configuration)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            # Setup connection options
            cnopts = pysftp.CnOpts()
            cnopts.hostkeys = None

            if not self.download_config.downloadUrl:
                raise ValueError("downloadUrl is not defined in configuration.")

            # open connection and store data locally
            with pysftp.Connection(
                host=self.download_config.downloadUrl.host,
                username=self.download_config.downloadUrl.user,
                password=self.download_config.downloadUrl.password,
                port=self.download_config.downloadUrl.port,
                cnopts=cnopts,
            ) as sftp:
                # Because of insane locking on Windows, we have to close
                # the downloaded file before adding it to the cache
                with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                    localpath = Path(handle.name).resolve()
                try:
                    sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
                    key = cache.add(localpath.read_bytes())
                finally:
                    localpath.unlink()

        return SessionUpdateSFTP(key=key)
get(self, session=None)

Download via sftp

Source code in oteapi/strategies/download/sftp.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateSFTP:
    """Download via sftp"""
    cache = DataCache(self.download_config.configuration)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        # Setup connection options
        cnopts = pysftp.CnOpts()
        cnopts.hostkeys = None

        if not self.download_config.downloadUrl:
            raise ValueError("downloadUrl is not defined in configuration.")

        # open connection and store data locally
        with pysftp.Connection(
            host=self.download_config.downloadUrl.host,
            username=self.download_config.downloadUrl.user,
            password=self.download_config.downloadUrl.password,
            port=self.download_config.downloadUrl.port,
            cnopts=cnopts,
        ) as sftp:
            # Because of insane locking on Windows, we have to close
            # the downloaded file before adding it to the cache
            with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                localpath = Path(handle.name).resolve()
            try:
                sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
                key = cache.add(localpath.read_bytes())
            finally:
                localpath.unlink()

    return SessionUpdateSFTP(key=key)
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/download/sftp.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateSFTP (SessionUpdate) pydantic-model

Class for returning values from Download SFTP strategy.

Source code in oteapi/strategies/download/sftp.py
class SessionUpdateSFTP(SessionUpdate):
    """Class for returning values from Download SFTP strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")
key: str pydantic-field required

Key to access the data in the cache.

filter special

crop_filter

Demo-filter strategy

CropFilter dataclass

Strategy for cropping an image.

Registers strategies:

  • ("filterType", "filter/crop")
Source code in oteapi/strategies/filter/crop_filter.py
@dataclass
class CropFilter:
    """Strategy for cropping an image.

    **Registers strategies**:

    - `("filterType", "filter/crop")`

    """

    filter_config: "FilterConfig"

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize strategy and return a dictionary."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCrop:
        """Execute strategy and return a dictionary"""
        cropData = (
            SessionUpdateCrop(**self.filter_config.configuration)
            if self.filter_config.configuration
            else SessionUpdateCrop()
        )
        return cropData
get(self, session=None)

Execute strategy and return a dictionary

Source code in oteapi/strategies/filter/crop_filter.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCrop:
    """Execute strategy and return a dictionary"""
    cropData = (
        SessionUpdateCrop(**self.filter_config.configuration)
        if self.filter_config.configuration
        else SessionUpdateCrop()
    )
    return cropData
initialize(self, session=None)

Initialize strategy and return a dictionary.

Source code in oteapi/strategies/filter/crop_filter.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize strategy and return a dictionary."""
    return SessionUpdate()

SessionUpdateCrop (SessionUpdate) pydantic-model

Class for returning values from crop data.

Source code in oteapi/strategies/filter/crop_filter.py
class SessionUpdateCrop(SessionUpdate):
    """Class for returning values from crop data."""

    crop: List[int] = Field(..., description="List of image cropping details.")
crop: List[int] pydantic-field required

List of image cropping details.

sql_query_filter

SQL query filter strategy.

SQLQueryFilter dataclass

Strategy for a SQL query filter.

Registers strategies:

  • ("filterType", "filter/sql")
Source code in oteapi/strategies/filter/sql_query_filter.py
@dataclass
class SQLQueryFilter:
    """Strategy for a SQL query filter.

    **Registers strategies**:

    - `("filterType", "filter/sql")`

    """

    filter_config: "FilterConfig"

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> SessionUpdateSqlQuery:
        """Initialize strategy and return a dictionary"""
        return SessionUpdateSqlQuery(**{"query": self.filter_config.query})

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Execute strategy and return a dictionary"""
        return SessionUpdate()
get(self, session=None)

Execute strategy and return a dictionary

Source code in oteapi/strategies/filter/sql_query_filter.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Execute strategy and return a dictionary"""
    return SessionUpdate()
initialize(self, session=None)

Initialize strategy and return a dictionary

Source code in oteapi/strategies/filter/sql_query_filter.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateSqlQuery:
    """Initialize strategy and return a dictionary"""
    return SessionUpdateSqlQuery(**{"query": self.filter_config.query})

SessionUpdateSqlQuery (SessionUpdate) pydantic-model

Class for returning values from SQL Query data model.

Source code in oteapi/strategies/filter/sql_query_filter.py
class SessionUpdateSqlQuery(SessionUpdate):
    """Class for returning values from SQL Query data model."""

    query: str = Field(..., description="A SQL query string.")
query: str pydantic-field required

A SQL query string.

parse special

application_json

Strategy class for application/json.

JSONDataParseStrategy dataclass

Parse strategy for JSON.

Registers strategies:

  • ("mediaType", "application/json")
Source code in oteapi/strategies/parse/application_json.py
@dataclass
class JSONDataParseStrategy:
    """Parse strategy for JSON.

    **Registers strategies**:

    - `("mediaType", "application/json")`

    """

    parse_config: "ResourceConfig"

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateJSONParse:
        """Parse json."""
        downloader = create_strategy("download", self.parse_config)
        output = downloader.get()
        cache = DataCache(self.parse_config.configuration)
        content = cache.get(output["key"])

        if isinstance(content, dict):
            return SessionUpdateJSONParse(content=content)
        return SessionUpdateJSONParse(content=json.loads(content))
get(self, session=None)

Parse json.

Source code in oteapi/strategies/parse/application_json.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateJSONParse:
    """Parse json."""
    downloader = create_strategy("download", self.parse_config)
    output = downloader.get()
    cache = DataCache(self.parse_config.configuration)
    content = cache.get(output["key"])

    if isinstance(content, dict):
        return SessionUpdateJSONParse(content=content)
    return SessionUpdateJSONParse(content=json.loads(content))
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/application_json.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateJSONParse (SessionUpdate) pydantic-model

Class for returning values from JSON Parse.

Source code in oteapi/strategies/parse/application_json.py
class SessionUpdateJSONParse(SessionUpdate):
    """Class for returning values from JSON Parse."""

    content: dict = Field(..., description="Content of the JSON document.")
content: dict pydantic-field required

Content of the JSON document.

application_vnd_sqlite

Strategy class for application/vnd.sqlite3.

SessionUpdateSqLiteParse (SessionUpdate) pydantic-model

Configuration model for SqLiteParse.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
class SessionUpdateSqLiteParse(SessionUpdate):
    """Configuration model for SqLiteParse."""

    result: Optional[list] = Field(None, description="List of results from the query.")
    msg: str = Field(..., description="Messsage concerning the execution of the query.")
msg: str pydantic-field required

Messsage concerning the execution of the query.

result: list pydantic-field

List of results from the query.

SqliteParseStrategy dataclass

Parse strategy for SQLite.

Registers strategies:

  • ("mediaType", "application/vnd.sqlite3")
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
@dataclass
class SqliteParseStrategy:
    """Parse strategy for SQLite.

    **Registers strategies**:

    - `("mediaType", "application/vnd.sqlite3")`

    """

    parse_config: "ResourceConfig"

    def get(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> SessionUpdateSqLiteParse:
        """Parse SQLite query responses."""
        if session is None:
            raise ValueError("Missing session")

        if "sqlquery" in session:
            cn = create_connection(session["filename"])
            cur = cn.cursor()
            rows = cur.execute(session["sqlquery"]).fetchall()
            return SessionUpdateSqLiteParse(result=rows, msg="Query executed")
        return SessionUpdateSqLiteParse(msg="No query given")

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()
get(self, session=None)

Parse SQLite query responses.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def get(
    self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateSqLiteParse:
    """Parse SQLite query responses."""
    if session is None:
        raise ValueError("Missing session")

    if "sqlquery" in session:
        cn = create_connection(session["filename"])
        cur = cn.cursor()
        rows = cur.execute(session["sqlquery"]).fetchall()
        return SessionUpdateSqLiteParse(result=rows, msg="Query executed")
    return SessionUpdateSqLiteParse(msg="No query given")
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

create_connection(db_file)

create a database connection to the SQLite database specified by db_file :param db_file: database file :return: Connection object or None

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def create_connection(db_file):
    """create a database connection to the SQLite database
        specified by db_file
    :param db_file: database file
    :return: Connection object or None
    """
    conn = None
    try:
        conn = sqlite3.connect(db_file)
        return conn
    except sqlite3.Error as exc:
        print(exc)

    return conn

excel_xlsx

Strategy class for workbook/xlsx.

SessionUpdateXLSXParse (SessionUpdate) pydantic-model

Class for returning values from XLSXParse.

Source code in oteapi/strategies/parse/excel_xlsx.py
class SessionUpdateXLSXParse(SessionUpdate):
    """Class for returning values from XLSXParse."""

    data: Dict[str, list] = Field(
        ...,
        description="A dict with column-name/column-value pairs. The values are lists.",
    )
data: Dict[str, list] pydantic-field required

A dict with column-name/column-value pairs. The values are lists.

XLSXParseDataModel (BaseModel) pydantic-model

Data model for retrieving a rectangular section of an Excel sheet.

Source code in oteapi/strategies/parse/excel_xlsx.py
class XLSXParseDataModel(BaseModel):
    """Data model for retrieving a rectangular section of an Excel sheet."""

    worksheet: str = Field(..., description="Name of worksheet to load.")
    row_from: Optional[int] = Field(
        None,
        description="Excel row number of first row. Defaults to first assigned row.",
    )
    col_from: Optional[Union[int, str]] = Field(
        None,
        description=(
            "Excel column number or label of first column. Defaults to first assigned "
            "column."
        ),
    )
    row_to: Optional[int] = Field(
        None, description="Excel row number of last row. Defaults to last assigned row."
    )
    col_to: Optional[Union[int, str]] = Field(
        None,
        description=(
            "Excel column number or label of last column. Defaults to last assigned "
            "column."
        ),
    )
    header_row: Optional[int] = Field(
        None,
        description=(
            "Row number with the headers. Defaults to `1` if header is given, "
            "otherwise `None`."
        ),
    )
    header: Optional[List[str]] = Field(
        None,
        description=(
            "Optional list of column names, specifying the columns to return. "
            "These names they should match cells in `header_row`."
        ),
    )
    new_header: Optional[List[str]] = Field(
        None,
        description=(
            "Optional list of new column names replacing `header` in the output."
        ),
    )
    download_config: AttrDict = Field(
        AttrDict(),
        description="Configurations provided to a download strategy.",
    )
col_from: Union[int, str] pydantic-field

Excel column number or label of first column. Defaults to first assigned column.

col_to: Union[int, str] pydantic-field

Excel column number or label of last column. Defaults to last assigned column.

download_config: AttrDict pydantic-field

Configurations provided to a download strategy.

header: List[str] pydantic-field

Optional list of column names, specifying the columns to return. These names they should match cells in header_row.

header_row: int pydantic-field

Row number with the headers. Defaults to 1 if header is given, otherwise None.

new_header: List[str] pydantic-field

Optional list of new column names replacing header in the output.

row_from: int pydantic-field

Excel row number of first row. Defaults to first assigned row.

row_to: int pydantic-field

Excel row number of last row. Defaults to last assigned row.

worksheet: str pydantic-field required

Name of worksheet to load.

XLSXParseStrategy dataclass

Parse strategy for Excel XLSX files.

Registers strategies:

  • ("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
Source code in oteapi/strategies/parse/excel_xlsx.py
@dataclass
class XLSXParseStrategy:
    """Parse strategy for Excel XLSX files.

    **Registers strategies**:

    - `("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")`

    """

    parse_config: "ResourceConfig"

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateXLSXParse:
        """Parses selected region of an excel file.

        Returns:
            A dict with column-name/column-value pairs. The values are lists.

        """
        model = XLSXParseDataModel(**self.parse_config.configuration)

        download_config = self.parse_config.copy()
        download_config.configuration = model.download_config
        downloader = create_strategy("download", download_config)
        output = downloader.get()

        cache = DataCache(self.parse_config.configuration)
        with cache.getfile(key=output["key"], suffix=".xlsx") as filename:
            workbook = load_workbook(filename=filename, read_only=True, data_only=True)

        worksheet = workbook[model.worksheet]
        set_model_defaults(model, worksheet)
        columns = get_column_indices(model, worksheet)

        data = []
        for row in worksheet.iter_rows(
            min_row=model.row_from,
            max_row=model.row_to,
            min_col=min(columns),
            max_col=max(columns),
        ):
            data.append([row[c - 1].value for c in columns])

        if model.header_row:
            row = worksheet.iter_rows(
                min_row=model.header_row,
                max_row=model.header_row,
                min_col=min(columns),
                max_col=max(columns),
            ).__next__()
            header = [row[c - 1].value for c in columns]
        else:
            header = None

        if model.new_header:
            nhead = len(header) if header else len(data[0]) if data else 0
            if len(model.new_header) != nhead:
                raise TypeError(
                    f"length of `new_header` (={len(model.new_header)}) "
                    f"doesn't match number of columns (={len(header) if header else 0})"
                )
            if header:
                for i, val in enumerate(model.new_header):
                    if val is not None:
                        header[i] = val
            elif data:
                header = model.new_header

        if header is None:
            header = [get_column_letter(col + 1) for col in range(len(data))]

        transposed = [list(datum) for datum in zip(*data)]
        return SessionUpdateXLSXParse(
            data={key: value for key, value in zip(header, transposed)}
        )
get(self, session=None)

Parses selected region of an excel file.

Returns:

Type Description
SessionUpdateXLSXParse

A dict with column-name/column-value pairs. The values are lists.

Source code in oteapi/strategies/parse/excel_xlsx.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateXLSXParse:
    """Parses selected region of an excel file.

    Returns:
        A dict with column-name/column-value pairs. The values are lists.

    """
    model = XLSXParseDataModel(**self.parse_config.configuration)

    download_config = self.parse_config.copy()
    download_config.configuration = model.download_config
    downloader = create_strategy("download", download_config)
    output = downloader.get()

    cache = DataCache(self.parse_config.configuration)
    with cache.getfile(key=output["key"], suffix=".xlsx") as filename:
        workbook = load_workbook(filename=filename, read_only=True, data_only=True)

    worksheet = workbook[model.worksheet]
    set_model_defaults(model, worksheet)
    columns = get_column_indices(model, worksheet)

    data = []
    for row in worksheet.iter_rows(
        min_row=model.row_from,
        max_row=model.row_to,
        min_col=min(columns),
        max_col=max(columns),
    ):
        data.append([row[c - 1].value for c in columns])

    if model.header_row:
        row = worksheet.iter_rows(
            min_row=model.header_row,
            max_row=model.header_row,
            min_col=min(columns),
            max_col=max(columns),
        ).__next__()
        header = [row[c - 1].value for c in columns]
    else:
        header = None

    if model.new_header:
        nhead = len(header) if header else len(data[0]) if data else 0
        if len(model.new_header) != nhead:
            raise TypeError(
                f"length of `new_header` (={len(model.new_header)}) "
                f"doesn't match number of columns (={len(header) if header else 0})"
            )
        if header:
            for i, val in enumerate(model.new_header):
                if val is not None:
                    header[i] = val
        elif data:
            header = model.new_header

    if header is None:
        header = [get_column_letter(col + 1) for col in range(len(data))]

    transposed = [list(datum) for datum in zip(*data)]
    return SessionUpdateXLSXParse(
        data={key: value for key, value in zip(header, transposed)}
    )
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/excel_xlsx.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

get_column_indices(model, worksheet)

Helper function returning a list of column indices.

Parameters:

Name Type Description Default
model XLSXParseDataModel

The parsed data model.

required
worksheet Worksheet

Excel worksheet, from which the header values will be retrieved.

required

Returns:

Type Description
Iterable[int]

A list of column indices.

Source code in oteapi/strategies/parse/excel_xlsx.py
def get_column_indices(
    model: XLSXParseDataModel, worksheet: "Worksheet"
) -> "Iterable[int]":
    """Helper function returning a list of column indices.

    Parameters:
        model: The parsed data model.
        worksheet: Excel worksheet, from which the header values will be retrieved.

    Returns:
        A list of column indices.

    """
    if not isinstance(model.col_from, int) or not isinstance(model.col_to, int):
        raise TypeError("Expected `model.col_from` and `model.col_to` to be integers.")

    if model.header:
        header_dict = {
            worksheet.cell(model.header_row, col).value: col
            for col in range(model.col_from, model.col_to + 1)
        }
        return [header_dict[h] for h in model.header]
    return range(model.col_from, model.col_to + 1)

set_model_defaults(model, worksheet)

Update data model model with default values obtained from worksheet.

Parameters:

Name Type Description Default
model XLSXParseDataModel

The parsed data model.

required
worksheet Worksheet

Excel worksheet, from which the default values will be obtained.

required
Source code in oteapi/strategies/parse/excel_xlsx.py
def set_model_defaults(model: XLSXParseDataModel, worksheet: "Worksheet") -> None:
    """Update data model `model` with default values obtained from `worksheet`.

    Parameters:
        model: The parsed data model.
        worksheet: Excel worksheet, from which the default values will be obtained.

    """
    if model.row_from is None:
        if model.header:
            # assume that data starts on the first row after the header
            model.row_from = model.header_row + 1 if model.header_row else 1
        else:
            model.row_from = worksheet.min_row

    if model.row_to is None:
        model.row_to = worksheet.max_row

    if model.col_from is None:
        model.col_from = worksheet.min_column
    elif isinstance(model.col_from, str):
        model.col_from = column_index_from_string(model.col_from)

    if model.col_to is None:
        model.col_to = worksheet.max_column
    elif isinstance(model.col_to, str):
        model.col_to = column_index_from_string(model.col_to)

    if model.header and not model.header_row:
        model.header_row = 1

image

Strategy class for image/jpg.

ImageDataParseStrategy dataclass

Parse strategy for images.

Registers strategies:

  • ("mediaType", "image/jpg")
  • ("mediaType", "image/jpeg")
  • ("mediaType", "image/jp2")
  • ("mediaType", "image/png")
  • ("mediaType", "image/gif")
  • ("mediaType", "image/tiff")
  • ("mediaType", "image/eps")
Source code in oteapi/strategies/parse/image.py
@dataclass
class ImageDataParseStrategy:
    """Parse strategy for images.

    **Registers strategies**:

    - `("mediaType", "image/jpg")`
    - `("mediaType", "image/jpeg")`
    - `("mediaType", "image/jp2")`
    - `("mediaType", "image/png")`
    - `("mediaType", "image/gif")`
    - `("mediaType", "image/tiff")`
    - `("mediaType", "image/eps")`

    """

    parse_config: "ResourceConfig"

    def __post_init__(self):
        self.localpath = "/ote-data"
        self.filename = self.parse_config.configuration["filename"]
        self.conf = self.parse_config.configuration
        if "localpath" in self.conf:
            self.localpath = self.conf["localpath"]

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> SessionUpdateImageParse:
        if session is not None:
            self.conf.update(session)
        parsedOutput = {}
        if "crop" in self.conf:
            print("cropping!")
            im = Image.open(f"{self.localpath}/{self.filename}")
            crop = self.conf["crop"]
            im_cropped = im.crop(tuple(crop))
            cropped_filename = f"{self.localpath}/cropped_{self.filename}"
            im_cropped.save(cropped_filename)
            parsedOutput["cropped_filename"] = cropped_filename
        parsedOutput["parseImage"] = "Done"
        return SessionUpdateImageParse(parsedOutput=parsedOutput)
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/image.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateImageParse (SessionUpdate) pydantic-model

Configuration model for ImageParse.

Source code in oteapi/strategies/parse/image.py
class SessionUpdateImageParse(SessionUpdate):
    """Configuration model for ImageParse."""

    parsedOutput: Dict[str, str] = Field(
        ..., description="Parsed output from ImageParse."
    )
parsedOutput: Dict[str, str] pydantic-field required

Parsed output from ImageParse.

text_csv

Strategy class for text/csv.

CSVParseStrategy dataclass

Parse strategy for CSV files.

Registers strategies:

  • ("mediaType", "text/csv")
Source code in oteapi/strategies/parse/text_csv.py
@dataclass
class CSVParseStrategy:
    """Parse strategy for CSV files.

    **Registers strategies**:

    - `("mediaType", "text/csv")`

    """

    parse_config: "ResourceConfig"

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Parse CSV."""
        return SessionUpdate()

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()
get(self, session=None)

Parse CSV.

Source code in oteapi/strategies/parse/text_csv.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Parse CSV."""
    return SessionUpdate()
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/text_csv.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

transformation special

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CeleryConfig (BaseModel) pydantic-model

Celery configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
class CeleryConfig(BaseModel):
    """Celery configuration."""

    task_name: str = Field(..., description="A task name.")
    args: list = Field(..., description="List of arguments for the task.")
args: list pydantic-field required

List of arguments for the task.

task_name: str pydantic-field required

A task name.

CeleryRemoteStrategy dataclass

Submit job to remote Celery runner.

Registers strategies:

  • ("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
@dataclass
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformationType", "celery/remote")`

    """

    transformation_config: "TransformationConfig"

    def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
        """Run a job, return a job ID."""
        config = self.transformation_config.configuration
        celery_config = CeleryConfig() if config is None else CeleryConfig(**config)
        result: "Union[AsyncResult, Any]" = app.send_task(
            celery_config.task_name, celery_config.args, kwargs=session
        )
        status = AsyncResult(id=result.task_id, app=app)
        return TransformationStatus(id=result.task_id, status=status.status)

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize a job."""
        return SessionUpdate()

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=app)
        return TransformationStatus(id=task_id, status=result.state)

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "SessionUpdateCelery":
        """Get transformation."""
        # TODO: update and return global state  # pylint: disable=fixme
        return SessionUpdateCelery(data={})
get(self, session=None)

Get transformation.

Source code in oteapi/strategies/transformation/celery_remote.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "SessionUpdateCelery":
    """Get transformation."""
    # TODO: update and return global state  # pylint: disable=fixme
    return SessionUpdateCelery(data={})
initialize(self, session=None)

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize a job."""
    return SessionUpdate()
run(self, session=None)

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
    """Run a job, return a job ID."""
    config = self.transformation_config.configuration
    celery_config = CeleryConfig() if config is None else CeleryConfig(**config)
    result: "Union[AsyncResult, Any]" = app.send_task(
        celery_config.task_name, celery_config.args, kwargs=session
    )
    status = AsyncResult(id=result.task_id, app=app)
    return TransformationStatus(id=result.task_id, status=status.status)
status(self, task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=app)
    return TransformationStatus(id=task_id, status=result.state)

SessionUpdateCelery (SessionUpdate) pydantic-model

Class for returning values from XLSXParse.

Source code in oteapi/strategies/transformation/celery_remote.py
class SessionUpdateCelery(SessionUpdate):
    """Class for returning values from XLSXParse."""

    data: Dict[str, list] = Field(
        ...,
        description="A dict with column-name/column-value pairs. The values are lists.",
    )
data: Dict[str, list] pydantic-field required

A dict with column-name/column-value pairs. The values are lists.

Back to top