Skip to content

OTE-API Core Strategies

This page provides documentation for the oteapi.strategies submodule, where all the core OTE-API strategies are located.

These strategies will always be available when setting up a server based on the OTE-API Core package.

download special

file

Download strategy class for the file scheme.

FileConfig (BaseModel) pydantic-model

File-specific Configuration Data Model.

Source code in oteapi/strategies/download/file.py
class FileConfig(BaseModel):
    """File-specific Configuration Data Model."""

    text: bool = Field(
        False,
        description=(
            "Whether the file should be opened in text mode. If `False`, the file will"
            " be opened in bytes mode."
        ),
    )
    encoding: Optional[str] = Field(
        None,
        description=(
            "Encoding used when opening the file. The default is platform dependent."
        ),
    )
encoding: str pydantic-field

Encoding used when opening the file. The default is platform dependent.

text: bool pydantic-field

Whether the file should be opened in text mode. If False, the file will be opened in bytes mode.

FileStrategy dataclass

Strategy for retrieving data from a local file.

Registers strategies:

  • ("scheme", "file")
Source code in oteapi/strategies/download/file.py
@dataclass
@StrategyFactory.register(("scheme", "file"))
class FileStrategy:
    """Strategy for retrieving data from a local file.

    **Registers strategies**:

    - `("scheme", "file")`

    """

    resource_config: "ResourceConfig"

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize."""
        return {}

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Read local file."""
        if (
            self.resource_config.downloadUrl is None
            or self.resource_config.downloadUrl.scheme != "file"
        ):
            raise ValueError(
                "Expected 'downloadUrl' to have scheme 'file' in the configuration."
            )

        filename = Path(self.resource_config.downloadUrl.host).resolve()

        cache = DataCache(self.resource_config.configuration)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            config = FileConfig(
                **self.resource_config.configuration, extra=Extra.ignore
            )
            key = cache.add(
                filename.read_text(encoding=config.encoding)
                if config.text
                else filename.read_bytes()
            )

        return {"key": key}
get(self, session=None)

Read local file.

Source code in oteapi/strategies/download/file.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Read local file."""
    if (
        self.resource_config.downloadUrl is None
        or self.resource_config.downloadUrl.scheme != "file"
    ):
        raise ValueError(
            "Expected 'downloadUrl' to have scheme 'file' in the configuration."
        )

    filename = Path(self.resource_config.downloadUrl.host).resolve()

    cache = DataCache(self.resource_config.configuration)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        config = FileConfig(
            **self.resource_config.configuration, extra=Extra.ignore
        )
        key = cache.add(
            filename.read_text(encoding=config.encoding)
            if config.text
            else filename.read_bytes()
        )

    return {"key": key}
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/download/file.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize."""
    return {}

https

Download strategy class for http/https

HTTPSStrategy dataclass

Strategy for retrieving data via http.

Registers strategies:

  • ("scheme", "http")
  • ("scheme", "https")
Source code in oteapi/strategies/download/https.py
@dataclass
@StrategyFactory.register(("scheme", "http"), ("scheme", "https"))
class HTTPSStrategy:
    """Strategy for retrieving data via http.

    **Registers strategies**:

    - `("scheme", "http")`
    - `("scheme", "https")`

    """

    resource_config: "ResourceConfig"

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize."""
        return {}

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Download via http/https and store on local cache."""
        cache = DataCache(self.resource_config.configuration)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            if not self.resource_config.downloadUrl:
                raise ValueError("downloadUrl not defined in configuration.")
            req = requests.get(self.resource_config.downloadUrl, allow_redirects=True)
            key = cache.add(req.content)

        return {"key": key}
get(self, session=None)

Download via http/https and store on local cache.

Source code in oteapi/strategies/download/https.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Download via http/https and store on local cache."""
    cache = DataCache(self.resource_config.configuration)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        if not self.resource_config.downloadUrl:
            raise ValueError("downloadUrl not defined in configuration.")
        req = requests.get(self.resource_config.downloadUrl, allow_redirects=True)
        key = cache.add(req.content)

    return {"key": key}
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/download/https.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize."""
    return {}

sftp

Strategy class for sftp/ftp

SFTPStrategy dataclass

Strategy for retrieving data via sftp.

Registers strategies:

  • ("scheme", "ftp")
  • ("scheme", "sftp")
Source code in oteapi/strategies/download/sftp.py
@dataclass
@StrategyFactory.register(("scheme", "sftp"), ("scheme", "ftp"))
class SFTPStrategy:
    """Strategy for retrieving data via sftp.

    **Registers strategies**:

    - `("scheme", "ftp")`
    - `("scheme", "sftp")`

    """

    resource_config: "ResourceConfig"

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize."""
        return {}

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Download via sftp"""
        cache = DataCache(self.resource_config.configuration)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            # Setup connection options
            cnopts = pysftp.CnOpts()
            cnopts.hostkeys = None

            if not self.resource_config.accessUrl:
                raise ValueError("accessUrl is not defined in configuration.")

            # open connection and store data locally
            with pysftp.Connection(
                host=self.resource_config.accessUrl.host,
                username=self.resource_config.accessUrl.user,
                password=self.resource_config.accessUrl.password,
                port=self.resource_config.accessUrl.port,
                cnopts=cnopts,
            ) as sftp:
                # Because of insane locking on Windows, we have to close
                # the downloaded file before adding it to the cache
                with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                    localpath = Path(handle.name).resolve()
                try:
                    sftp.get(self.resource_config.accessUrl.path, localpath=localpath)
                    key = cache.add(localpath.read_bytes())
                finally:
                    localpath.unlink()

        return {"key": key}
get(self, session=None)

Download via sftp

Source code in oteapi/strategies/download/sftp.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Download via sftp"""
    cache = DataCache(self.resource_config.configuration)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        # Setup connection options
        cnopts = pysftp.CnOpts()
        cnopts.hostkeys = None

        if not self.resource_config.accessUrl:
            raise ValueError("accessUrl is not defined in configuration.")

        # open connection and store data locally
        with pysftp.Connection(
            host=self.resource_config.accessUrl.host,
            username=self.resource_config.accessUrl.user,
            password=self.resource_config.accessUrl.password,
            port=self.resource_config.accessUrl.port,
            cnopts=cnopts,
        ) as sftp:
            # Because of insane locking on Windows, we have to close
            # the downloaded file before adding it to the cache
            with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                localpath = Path(handle.name).resolve()
            try:
                sftp.get(self.resource_config.accessUrl.path, localpath=localpath)
                key = cache.add(localpath.read_bytes())
            finally:
                localpath.unlink()

    return {"key": key}
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/download/sftp.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize."""
    return {}

filter special

crop_filter

Demo-filter strategy

CropDataModel (BaseModel) pydantic-model

Configuration model for crop data.

Source code in oteapi/strategies/filter/crop_filter.py
class CropDataModel(BaseModel):
    """Configuration model for crop data."""

    crop: List[int] = Field(..., description="List of image cropping details.")
crop: List[int] pydantic-field required

List of image cropping details.

CropFilter dataclass

Strategy for cropping an image.

Registers strategies:

  • ("filterType", "filter/crop")
Source code in oteapi/strategies/filter/crop_filter.py
@dataclass
@StrategyFactory.register(("filterType", "filter/crop"))
class CropFilter:
    """Strategy for cropping an image.

    **Registers strategies**:

    - `("filterType", "filter/crop")`

    """

    filter_config: "FilterConfig"

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize strategy and return a dictionary."""
        return {"result": "collectionid"}

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Execute strategy and return a dictionary"""
        cropData = (
            CropDataModel(**self.filter_config.configuration)
            if self.filter_config.configuration
            else CropDataModel()
        )
        return {"imagecrop": cropData.crop}
get(self, session=None)

Execute strategy and return a dictionary

Source code in oteapi/strategies/filter/crop_filter.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Execute strategy and return a dictionary"""
    cropData = (
        CropDataModel(**self.filter_config.configuration)
        if self.filter_config.configuration
        else CropDataModel()
    )
    return {"imagecrop": cropData.crop}
initialize(self, session=None)

Initialize strategy and return a dictionary.

Source code in oteapi/strategies/filter/crop_filter.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize strategy and return a dictionary."""
    return {"result": "collectionid"}

sql_query_filter

SQL query filter strategy.

SQLQueryFilter dataclass

Strategy for a SQL query filter.

Registers strategies:

  • ("filterType", "filter/sql")
Source code in oteapi/strategies/filter/sql_query_filter.py
@dataclass
@StrategyFactory.register(("filterType", "filter/sql"))
class SQLQueryFilter:
    """Strategy for a SQL query filter.

    **Registers strategies**:

    - `("filterType", "filter/sql")`

    """

    filter_config: "FilterConfig"

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize strategy and return a dictionary"""
        queryData = SqlQueryDataModel(**{"query": self.filter_config.query})
        return {"sqlquery": queryData.query}

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Execute strategy and return a dictionary"""
        return {}
get(self, session=None)

Execute strategy and return a dictionary

Source code in oteapi/strategies/filter/sql_query_filter.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Execute strategy and return a dictionary"""
    return {}
initialize(self, session=None)

Initialize strategy and return a dictionary

Source code in oteapi/strategies/filter/sql_query_filter.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize strategy and return a dictionary"""
    queryData = SqlQueryDataModel(**{"query": self.filter_config.query})
    return {"sqlquery": queryData.query}

SqlQueryDataModel (BaseModel) pydantic-model

SQL Query data model.

Source code in oteapi/strategies/filter/sql_query_filter.py
class SqlQueryDataModel(BaseModel):
    """SQL Query data model."""

    query: str = Field(..., description="A SQL query string.")
query: str pydantic-field required

A SQL query string.

parse special

application_vnd_sqlite

Strategy class for application/vnd.sqlite3.

SqliteParseStrategy dataclass

Parse strategy for SQLite.

Registers strategies:

  • ("mediaType", "application/vnd.sqlite3")
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
@dataclass
@StrategyFactory.register(("mediaType", "application/vnd.sqlite3"))
class SqliteParseStrategy:
    """Parse strategy for SQLite.

    **Registers strategies**:

    - `("mediaType", "application/vnd.sqlite3")`

    """

    resource_config: "ResourceConfig"

    def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Parse SQLite query responses."""
        if session is None:
            raise ValueError("Missing session")

        if "sqlquery" in session:
            cn = create_connection(session["filename"])
            cur = cn.cursor()
            rows = cur.execute(session["sqlquery"]).fetchall()
            return {"result": rows}
        return {"result": "No query given"}

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize."""
        return {}
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize."""
    return {}
parse(self, session=None)

Parse SQLite query responses.

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Parse SQLite query responses."""
    if session is None:
        raise ValueError("Missing session")

    if "sqlquery" in session:
        cn = create_connection(session["filename"])
        cur = cn.cursor()
        rows = cur.execute(session["sqlquery"]).fetchall()
        return {"result": rows}
    return {"result": "No query given"}

create_connection(db_file)

create a database connection to the SQLite database specified by db_file :param db_file: database file :return: Connection object or None

Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def create_connection(db_file):
    """create a database connection to the SQLite database
        specified by db_file
    :param db_file: database file
    :return: Connection object or None
    """
    conn = None
    try:
        conn = sqlite3.connect(db_file)
        return conn
    except sqlite3.Error as exc:
        print(exc)

    return conn

excel_xlsx

Strategy class for workbook/xlsx.

XLSXParseDataModel (BaseModel) pydantic-model

Data model for retrieving a rectangular section of an Excel sheet.

Source code in oteapi/strategies/parse/excel_xlsx.py
class XLSXParseDataModel(BaseModel):
    """Data model for retrieving a rectangular section of an Excel sheet."""

    worksheet: str = Field(..., description="Name of worksheet to load.")
    row_from: Optional[int] = Field(
        None,
        description="Excel row number of first row. Defaults to first assigned row.",
    )
    col_from: Optional[Union[int, str]] = Field(
        None,
        description=(
            "Excel column number or label of first column. Defaults to first assigned "
            "column."
        ),
    )
    row_to: Optional[int] = Field(
        None, description="Excel row number of last row. Defaults to last assigned row."
    )
    col_to: Optional[Union[int, str]] = Field(
        None,
        description=(
            "Excel column number or label of last column. Defaults to last assigned "
            "column."
        ),
    )
    header_row: Optional[int] = Field(
        None,
        description=(
            "Row number with the headers. Defaults to `1` if header is given, "
            "otherwise `None`."
        ),
    )
    header: Optional[List[str]] = Field(
        None,
        description=(
            "Optional list of column names, specifying the columns to return. "
            "These names they should match cells in `header_row`."
        ),
    )
    new_header: Optional[List[str]] = Field(
        None,
        description=(
            "Optional list of new column names replacing `header` in the output."
        ),
    )
col_from: Union[int, str] pydantic-field

Excel column number or label of first column. Defaults to first assigned column.

col_to: Union[int, str] pydantic-field

Excel column number or label of last column. Defaults to last assigned column.

header: List[str] pydantic-field

Optional list of column names, specifying the columns to return. These names they should match cells in header_row.

header_row: int pydantic-field

Row number with the headers. Defaults to 1 if header is given, otherwise None.

new_header: List[str] pydantic-field

Optional list of new column names replacing header in the output.

row_from: int pydantic-field

Excel row number of first row. Defaults to first assigned row.

row_to: int pydantic-field

Excel row number of last row. Defaults to last assigned row.

worksheet: str pydantic-field required

Name of worksheet to load.

XLSXParseStrategy dataclass

Parse strategy for Excel XLSX files.

Registers strategies:

  • ("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
Source code in oteapi/strategies/parse/excel_xlsx.py
@dataclass
@StrategyFactory.register(
    ("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
)
class XLSXParseStrategy:
    """Parse strategy for Excel XLSX files.

    **Registers strategies**:

    - `("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")`

    """

    resource_config: "ResourceConfig"

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize."""
        return {}

    def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Parses selected region of an excel file.

        Returns:
            A dict with column-name/column-value pairs. The values are lists.

        """
        model = XLSXParseDataModel(
            **self.resource_config.configuration, extra=Extra.ignore
        )

        downloader = create_download_strategy(self.resource_config)
        output = downloader.get()
        cache = DataCache(self.resource_config.configuration)
        with cache.getfile(key=output["key"], suffix=".xlsx") as filename:
            workbook = load_workbook(filename=filename, read_only=True, data_only=True)
        worksheet = workbook[model.worksheet]
        set_model_defaults(model, worksheet)
        columns = get_column_indices(model, worksheet)

        data = []
        for row in worksheet.iter_rows(
            min_row=model.row_from,
            max_row=model.row_to,
            min_col=min(columns),
            max_col=max(columns),
        ):
            data.append([row[c - 1].value for c in columns])

        if model.header_row:
            row = worksheet.iter_rows(
                min_row=model.header_row,
                max_row=model.header_row,
                min_col=min(columns),
                max_col=max(columns),
            ).__next__()
            header = [row[c - 1].value for c in columns]
        else:
            header = None

        if model.new_header:
            nhead = len(header) if header else len(data[0]) if data else 0
            if len(model.new_header) != nhead:
                raise TypeError(
                    f"length of `new_header` (={len(model.new_header)}) "
                    f"doesn't match number of columns (={len(header) if header else 0})"
                )
            if header:
                for i, val in enumerate(model.new_header):
                    if val is not None:
                        header[i] = val
            elif data:
                header = model.new_header

        if header is None:
            header = [get_column_letter(col + 1) for col in range(len(data))]

        transposed = list(map(list, zip(*data)))
        return {k: v for k, v in zip(header, transposed)}
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/excel_xlsx.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize."""
    return {}
parse(self, session=None)

Parses selected region of an excel file.

Returns:

Type Description
Dict[str, Any]

A dict with column-name/column-value pairs. The values are lists.

Source code in oteapi/strategies/parse/excel_xlsx.py
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Parses selected region of an excel file.

    Returns:
        A dict with column-name/column-value pairs. The values are lists.

    """
    model = XLSXParseDataModel(
        **self.resource_config.configuration, extra=Extra.ignore
    )

    downloader = create_download_strategy(self.resource_config)
    output = downloader.get()
    cache = DataCache(self.resource_config.configuration)
    with cache.getfile(key=output["key"], suffix=".xlsx") as filename:
        workbook = load_workbook(filename=filename, read_only=True, data_only=True)
    worksheet = workbook[model.worksheet]
    set_model_defaults(model, worksheet)
    columns = get_column_indices(model, worksheet)

    data = []
    for row in worksheet.iter_rows(
        min_row=model.row_from,
        max_row=model.row_to,
        min_col=min(columns),
        max_col=max(columns),
    ):
        data.append([row[c - 1].value for c in columns])

    if model.header_row:
        row = worksheet.iter_rows(
            min_row=model.header_row,
            max_row=model.header_row,
            min_col=min(columns),
            max_col=max(columns),
        ).__next__()
        header = [row[c - 1].value for c in columns]
    else:
        header = None

    if model.new_header:
        nhead = len(header) if header else len(data[0]) if data else 0
        if len(model.new_header) != nhead:
            raise TypeError(
                f"length of `new_header` (={len(model.new_header)}) "
                f"doesn't match number of columns (={len(header) if header else 0})"
            )
        if header:
            for i, val in enumerate(model.new_header):
                if val is not None:
                    header[i] = val
        elif data:
            header = model.new_header

    if header is None:
        header = [get_column_letter(col + 1) for col in range(len(data))]

    transposed = list(map(list, zip(*data)))
    return {k: v for k, v in zip(header, transposed)}

get_column_indices(model, worksheet)

Helper function returning a list of column indices.

Parameters:

Name Type Description Default
model XLSXParseDataModel

The parsed data model.

required
worksheet Worksheet

Excel worksheet, from which the header values will be retrieved.

required

Returns:

Type Description
Iterable[int]

A list of column indices.

Source code in oteapi/strategies/parse/excel_xlsx.py
def get_column_indices(
    model: XLSXParseDataModel, worksheet: "Worksheet"
) -> "Iterable[int]":
    """Helper function returning a list of column indices.

    Parameters:
        model: The parsed data model.
        worksheet: Excel worksheet, from which the header values will be retrieved.

    Returns:
        A list of column indices.

    """
    if not isinstance(model.col_from, int) or not isinstance(model.col_to, int):
        raise TypeError("Expected `model.col_from` and `model.col_to` to be integers.")

    if model.header:
        header_dict = {
            worksheet.cell(model.header_row, col).value: col
            for col in range(model.col_from, model.col_to + 1)
        }
        return [header_dict[h] for h in model.header]
    return range(model.col_from, model.col_to + 1)

set_model_defaults(model, worksheet)

Update data model model with default values obtained from worksheet.

Parameters:

Name Type Description Default
model XLSXParseDataModel

The parsed data model.

required
worksheet Worksheet

Excel worksheet, from which the default values will be obtained.

required
Source code in oteapi/strategies/parse/excel_xlsx.py
def set_model_defaults(model: XLSXParseDataModel, worksheet: "Worksheet") -> None:
    """Update data model `model` with default values obtained from `worksheet`.

    Parameters:
        model: The parsed data model.
        worksheet: Excel worksheet, from which the default values will be obtained.

    """
    if model.row_from is None:
        if model.header:
            # assume that data starts on the first row after the header
            model.row_from = model.header_row + 1 if model.header_row else 1
        else:
            model.row_from = worksheet.min_row

    if model.row_to is None:
        model.row_to = worksheet.max_row

    if model.col_from is None:
        model.col_from = worksheet.min_column
    elif isinstance(model.col_from, str):
        model.col_from = column_index_from_string(model.col_from)

    if model.col_to is None:
        model.col_to = worksheet.max_column
    elif isinstance(model.col_to, str):
        model.col_to = column_index_from_string(model.col_to)

    if model.header and not model.header_row:
        model.header_row = 1

image_jpeg

Strategy class for image/jpg.

ImageDataParseStrategy dataclass

Parse strategy for images.

Registers strategies:

  • ("mediaType", "image/jpg")
  • ("mediaType", "image/jpeg")
  • ("mediaType", "image/j2p")
  • ("mediaType", "image/png")
  • ("mediaType", "image/gif")
  • ("mediaType", "image/tiff")
  • ("mediaType", "image/eps")
Source code in oteapi/strategies/parse/image_jpeg.py
@dataclass
@StrategyFactory.register(
    ("mediaType", "image/jpg"),
    ("mediaType", "image/jpeg"),
    ("mediaType", "image/j2p"),
    ("mediaType", "image/png"),
    ("mediaType", "image/gif"),
    ("mediaType", "image/tiff"),
    ("mediaType", "image/eps"),
)
class ImageDataParseStrategy:
    """Parse strategy for images.

    **Registers strategies**:

    - `("mediaType", "image/jpg")`
    - `("mediaType", "image/jpeg")`
    - `("mediaType", "image/j2p")`
    - `("mediaType", "image/png")`
    - `("mediaType", "image/gif")`
    - `("mediaType", "image/tiff")`
    - `("mediaType", "image/eps")`

    """

    resource_config: "ResourceConfig"

    def __post_init__(self):
        self.localpath = "/ote-data"
        self.filename = self.resource_config.configuration["artifactName"]
        if self.resource_config.configuration:
            self.conf = self.resource_config.configuration
        else:
            self.conf = {}

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize."""
        return {}

    def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        if session is not None:
            self.conf.update(session)
        parsedOutput = {}
        if "crop" in self.conf:
            print("cropping!")
            im = Image.open(f"{self.localpath}/{self.filename}")
            crop = self.conf["crop"]
            im_cropped = im.crop(tuple(crop))
            cropped_filename = f"{self.localpath}/cropped_{self.filename}"
            im_cropped.save(cropped_filename)
            parsedOutput["cropped_filename"] = cropped_filename
        parsedOutput["parseImage"] = "Done"
        return parsedOutput
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/image_jpeg.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize."""
    return {}

text_csv

Strategy class for text/csv.

CSVParseStrategy dataclass

Parse strategy for CSV files.

Registers strategies:

  • ("mediaType", "text/csv")
Source code in oteapi/strategies/parse/text_csv.py
@dataclass
@StrategyFactory.register(("mediaType", "text/csv"))
class CSVParseStrategy:
    """Parse strategy for CSV files.

    **Registers strategies**:

    - `("mediaType", "text/csv")`

    """

    resource_config: "ResourceConfig"

    def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Parse CSV."""
        print("CSV in action!")
        return {}

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize."""
        return {}
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/text_csv.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize."""
    return {}
parse(self, session=None)

Parse CSV.

Source code in oteapi/strategies/parse/text_csv.py
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Parse CSV."""
    print("CSV in action!")
    return {}

text_json

Strategy class for text/json.

JSONDataParseStrategy dataclass

Parse strategy for JSON.

Registers strategies:

  • ("mediaType", "text/json")
Source code in oteapi/strategies/parse/text_json.py
@dataclass
@StrategyFactory.register(("mediaType", "text/json"))
class JSONDataParseStrategy:
    """Parse strategy for JSON.

    **Registers strategies**:

    - `("mediaType", "text/json")`

    """

    resource_config: "ResourceConfig"

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize."""
        return {}

    def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Parse json."""
        downloader = create_download_strategy(self.resource_config)
        output = downloader.get()
        cache = DataCache(self.resource_config.configuration)
        content = cache.get(output["key"])

        if isinstance(content, dict):
            return content
        return json.loads(content)
initialize(self, session=None)

Initialize.

Source code in oteapi/strategies/parse/text_json.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize."""
    return {}
parse(self, session=None)

Parse json.

Source code in oteapi/strategies/parse/text_json.py
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Parse json."""
    downloader = create_download_strategy(self.resource_config)
    output = downloader.get()
    cache = DataCache(self.resource_config.configuration)
    content = cache.get(output["key"])

    if isinstance(content, dict):
        return content
    return json.loads(content)

transformation special

celery_remote

Transformation Plugin that uses the Celery framework to call remote workers.

CeleryConfig (BaseModel) pydantic-model

Celery configuration.

Source code in oteapi/strategies/transformation/celery_remote.py
class CeleryConfig(BaseModel):
    """Celery configuration."""

    taskName: str = Field(..., description="A task name.")
    args: List[Any] = Field(..., description="List of arguments for the task.")
args: List[Any] pydantic-field required

List of arguments for the task.

taskName: str pydantic-field required

A task name.

CeleryRemoteStrategy dataclass

Submit job to remote Celery runner.

Registers strategies:

  • ("transformation_type", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
@dataclass
@StrategyFactory.register(("transformation_type", "celery/remote"))
class CeleryRemoteStrategy:
    """Submit job to remote Celery runner.

    **Registers strategies**:

    - `("transformation_type", "celery/remote")`

    """

    transformation_config: "TransformationConfig"

    def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Run a job, return a job ID."""
        config = self.transformation_config.configuration
        celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
        result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
        return {"result": result.task_id}

    def initialize(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> "Dict[str, Any]":
        """Initialize a job."""
        return {}

    def status(self, task_id: str) -> TransformationStatus:
        """Get job status."""
        result = AsyncResult(id=task_id, app=app)
        return TransformationStatus(id=task_id, status=result.state)

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
        """Get transformation."""
        # TODO: update and return global state  # pylint: disable=fixme
        return {}
get(self, session=None)

Get transformation.

Source code in oteapi/strategies/transformation/celery_remote.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Get transformation."""
    # TODO: update and return global state  # pylint: disable=fixme
    return {}
initialize(self, session=None)

Initialize a job.

Source code in oteapi/strategies/transformation/celery_remote.py
def initialize(
    self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
    """Initialize a job."""
    return {}
run(self, session=None)

Run a job, return a job ID.

Source code in oteapi/strategies/transformation/celery_remote.py
def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
    """Run a job, return a job ID."""
    config = self.transformation_config.configuration
    celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
    result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
    return {"result": result.task_id}
status(self, task_id)

Get job status.

Source code in oteapi/strategies/transformation/celery_remote.py
def status(self, task_id: str) -> TransformationStatus:
    """Get job status."""
    result = AsyncResult(id=task_id, app=app)
    return TransformationStatus(id=task_id, status=result.state)
Back to top