OTE-API Core Strategies¶
This page provides documentation for the oteapi.strategies
submodule, where all the core OTE-API strategies are located.
These strategies will always be available when setting up a server based on the OTE-API Core package.
download
special
¶
file
¶
Download strategy class for the file
scheme.
FileConfig (BaseModel)
pydantic-model
¶
File-specific Configuration Data Model.
Source code in oteapi/strategies/download/file.py
class FileConfig(BaseModel):
"""File-specific Configuration Data Model."""
text: bool = Field(
False,
description=(
"Whether the file should be opened in text mode. If `False`, the file will"
" be opened in bytes mode."
),
)
encoding: Optional[str] = Field(
None,
description=(
"Encoding used when opening the file. The default is platform dependent."
),
)
FileStrategy
dataclass
¶
Strategy for retrieving data from a local file.
Registers strategies:
("scheme", "file")
Source code in oteapi/strategies/download/file.py
@dataclass
@StrategyFactory.register(("scheme", "file"))
class FileStrategy:
"""Strategy for retrieving data from a local file.
**Registers strategies**:
- `("scheme", "file")`
"""
resource_config: "ResourceConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Read local file."""
if (
self.resource_config.downloadUrl is None
or self.resource_config.downloadUrl.scheme != "file"
):
raise ValueError(
"Expected 'downloadUrl' to have scheme 'file' in the configuration."
)
filename = Path(self.resource_config.downloadUrl.host).resolve()
cache = DataCache(self.resource_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
config = FileConfig(
**self.resource_config.configuration, extra=Extra.ignore
)
key = cache.add(
filename.read_text(encoding=config.encoding)
if config.text
else filename.read_bytes()
)
return {"key": key}
get(self, session=None)
¶
Read local file.
Source code in oteapi/strategies/download/file.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Read local file."""
if (
self.resource_config.downloadUrl is None
or self.resource_config.downloadUrl.scheme != "file"
):
raise ValueError(
"Expected 'downloadUrl' to have scheme 'file' in the configuration."
)
filename = Path(self.resource_config.downloadUrl.host).resolve()
cache = DataCache(self.resource_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
config = FileConfig(
**self.resource_config.configuration, extra=Extra.ignore
)
key = cache.add(
filename.read_text(encoding=config.encoding)
if config.text
else filename.read_bytes()
)
return {"key": key}
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/download/file.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
https
¶
Download strategy class for http/https
HTTPSStrategy
dataclass
¶
Strategy for retrieving data via http.
Registers strategies:
("scheme", "http")
("scheme", "https")
Source code in oteapi/strategies/download/https.py
@dataclass
@StrategyFactory.register(("scheme", "http"), ("scheme", "https"))
class HTTPSStrategy:
"""Strategy for retrieving data via http.
**Registers strategies**:
- `("scheme", "http")`
- `("scheme", "https")`
"""
resource_config: "ResourceConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Download via http/https and store on local cache."""
cache = DataCache(self.resource_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
if not self.resource_config.downloadUrl:
raise ValueError("downloadUrl not defined in configuration.")
req = requests.get(self.resource_config.downloadUrl, allow_redirects=True)
key = cache.add(req.content)
return {"key": key}
get(self, session=None)
¶
Download via http/https and store on local cache.
Source code in oteapi/strategies/download/https.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Download via http/https and store on local cache."""
cache = DataCache(self.resource_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
if not self.resource_config.downloadUrl:
raise ValueError("downloadUrl not defined in configuration.")
req = requests.get(self.resource_config.downloadUrl, allow_redirects=True)
key = cache.add(req.content)
return {"key": key}
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/download/https.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
sftp
¶
Strategy class for sftp/ftp
SFTPStrategy
dataclass
¶
Strategy for retrieving data via sftp.
Registers strategies:
("scheme", "ftp")
("scheme", "sftp")
Source code in oteapi/strategies/download/sftp.py
@dataclass
@StrategyFactory.register(("scheme", "sftp"), ("scheme", "ftp"))
class SFTPStrategy:
"""Strategy for retrieving data via sftp.
**Registers strategies**:
- `("scheme", "ftp")`
- `("scheme", "sftp")`
"""
resource_config: "ResourceConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Download via sftp"""
cache = DataCache(self.resource_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
# Setup connection options
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
if not self.resource_config.accessUrl:
raise ValueError("accessUrl is not defined in configuration.")
# open connection and store data locally
with pysftp.Connection(
host=self.resource_config.accessUrl.host,
username=self.resource_config.accessUrl.user,
password=self.resource_config.accessUrl.password,
port=self.resource_config.accessUrl.port,
cnopts=cnopts,
) as sftp:
# Because of insane locking on Windows, we have to close
# the downloaded file before adding it to the cache
with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
localpath = Path(handle.name).resolve()
try:
sftp.get(self.resource_config.accessUrl.path, localpath=localpath)
key = cache.add(localpath.read_bytes())
finally:
localpath.unlink()
return {"key": key}
get(self, session=None)
¶
Download via sftp
Source code in oteapi/strategies/download/sftp.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Download via sftp"""
cache = DataCache(self.resource_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
# Setup connection options
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
if not self.resource_config.accessUrl:
raise ValueError("accessUrl is not defined in configuration.")
# open connection and store data locally
with pysftp.Connection(
host=self.resource_config.accessUrl.host,
username=self.resource_config.accessUrl.user,
password=self.resource_config.accessUrl.password,
port=self.resource_config.accessUrl.port,
cnopts=cnopts,
) as sftp:
# Because of insane locking on Windows, we have to close
# the downloaded file before adding it to the cache
with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
localpath = Path(handle.name).resolve()
try:
sftp.get(self.resource_config.accessUrl.path, localpath=localpath)
key = cache.add(localpath.read_bytes())
finally:
localpath.unlink()
return {"key": key}
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/download/sftp.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
filter
special
¶
crop_filter
¶
Demo-filter strategy
CropDataModel (BaseModel)
pydantic-model
¶
Configuration model for crop data.
Source code in oteapi/strategies/filter/crop_filter.py
class CropDataModel(BaseModel):
"""Configuration model for crop data."""
crop: List[int] = Field(..., description="List of image cropping details.")
crop: List[int]
pydantic-field
required
¶
List of image cropping details.
CropFilter
dataclass
¶
Strategy for cropping an image.
Registers strategies:
("filterType", "filter/crop")
Source code in oteapi/strategies/filter/crop_filter.py
@dataclass
@StrategyFactory.register(("filterType", "filter/crop"))
class CropFilter:
"""Strategy for cropping an image.
**Registers strategies**:
- `("filterType", "filter/crop")`
"""
filter_config: "FilterConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize strategy and return a dictionary."""
return {"result": "collectionid"}
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Execute strategy and return a dictionary"""
cropData = (
CropDataModel(**self.filter_config.configuration)
if self.filter_config.configuration
else CropDataModel()
)
return {"imagecrop": cropData.crop}
get(self, session=None)
¶
Execute strategy and return a dictionary
Source code in oteapi/strategies/filter/crop_filter.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Execute strategy and return a dictionary"""
cropData = (
CropDataModel(**self.filter_config.configuration)
if self.filter_config.configuration
else CropDataModel()
)
return {"imagecrop": cropData.crop}
initialize(self, session=None)
¶
Initialize strategy and return a dictionary.
Source code in oteapi/strategies/filter/crop_filter.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize strategy and return a dictionary."""
return {"result": "collectionid"}
sql_query_filter
¶
SQL query filter strategy.
SQLQueryFilter
dataclass
¶
Strategy for a SQL query filter.
Registers strategies:
("filterType", "filter/sql")
Source code in oteapi/strategies/filter/sql_query_filter.py
@dataclass
@StrategyFactory.register(("filterType", "filter/sql"))
class SQLQueryFilter:
"""Strategy for a SQL query filter.
**Registers strategies**:
- `("filterType", "filter/sql")`
"""
filter_config: "FilterConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize strategy and return a dictionary"""
queryData = SqlQueryDataModel(**{"query": self.filter_config.query})
return {"sqlquery": queryData.query}
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Execute strategy and return a dictionary"""
return {}
get(self, session=None)
¶
Execute strategy and return a dictionary
Source code in oteapi/strategies/filter/sql_query_filter.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Execute strategy and return a dictionary"""
return {}
initialize(self, session=None)
¶
Initialize strategy and return a dictionary
Source code in oteapi/strategies/filter/sql_query_filter.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize strategy and return a dictionary"""
queryData = SqlQueryDataModel(**{"query": self.filter_config.query})
return {"sqlquery": queryData.query}
SqlQueryDataModel (BaseModel)
pydantic-model
¶
SQL Query data model.
Source code in oteapi/strategies/filter/sql_query_filter.py
class SqlQueryDataModel(BaseModel):
"""SQL Query data model."""
query: str = Field(..., description="A SQL query string.")
query: str
pydantic-field
required
¶
A SQL query string.
parse
special
¶
application_vnd_sqlite
¶
Strategy class for application/vnd.sqlite3.
SqliteParseStrategy
dataclass
¶
Parse strategy for SQLite.
Registers strategies:
("mediaType", "application/vnd.sqlite3")
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
@dataclass
@StrategyFactory.register(("mediaType", "application/vnd.sqlite3"))
class SqliteParseStrategy:
"""Parse strategy for SQLite.
**Registers strategies**:
- `("mediaType", "application/vnd.sqlite3")`
"""
resource_config: "ResourceConfig"
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Parse SQLite query responses."""
if session is None:
raise ValueError("Missing session")
if "sqlquery" in session:
cn = create_connection(session["filename"])
cur = cn.cursor()
rows = cur.execute(session["sqlquery"]).fetchall()
return {"result": rows}
return {"result": "No query given"}
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
parse(self, session=None)
¶
Parse SQLite query responses.
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Parse SQLite query responses."""
if session is None:
raise ValueError("Missing session")
if "sqlquery" in session:
cn = create_connection(session["filename"])
cur = cn.cursor()
rows = cur.execute(session["sqlquery"]).fetchall()
return {"result": rows}
return {"result": "No query given"}
create_connection(db_file)
¶
create a database connection to the SQLite database specified by db_file :param db_file: database file :return: Connection object or None
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def create_connection(db_file):
"""create a database connection to the SQLite database
specified by db_file
:param db_file: database file
:return: Connection object or None
"""
conn = None
try:
conn = sqlite3.connect(db_file)
return conn
except sqlite3.Error as exc:
print(exc)
return conn
excel_xlsx
¶
Strategy class for workbook/xlsx.
XLSXParseDataModel (BaseModel)
pydantic-model
¶
Data model for retrieving a rectangular section of an Excel sheet.
Source code in oteapi/strategies/parse/excel_xlsx.py
class XLSXParseDataModel(BaseModel):
"""Data model for retrieving a rectangular section of an Excel sheet."""
worksheet: str = Field(..., description="Name of worksheet to load.")
row_from: Optional[int] = Field(
None,
description="Excel row number of first row. Defaults to first assigned row.",
)
col_from: Optional[Union[int, str]] = Field(
None,
description=(
"Excel column number or label of first column. Defaults to first assigned "
"column."
),
)
row_to: Optional[int] = Field(
None, description="Excel row number of last row. Defaults to last assigned row."
)
col_to: Optional[Union[int, str]] = Field(
None,
description=(
"Excel column number or label of last column. Defaults to last assigned "
"column."
),
)
header_row: Optional[int] = Field(
None,
description=(
"Row number with the headers. Defaults to `1` if header is given, "
"otherwise `None`."
),
)
header: Optional[List[str]] = Field(
None,
description=(
"Optional list of column names, specifying the columns to return. "
"These names they should match cells in `header_row`."
),
)
new_header: Optional[List[str]] = Field(
None,
description=(
"Optional list of new column names replacing `header` in the output."
),
)
col_from: Union[int, str]
pydantic-field
¶
Excel column number or label of first column. Defaults to first assigned column.
col_to: Union[int, str]
pydantic-field
¶
Excel column number or label of last column. Defaults to last assigned column.
header: List[str]
pydantic-field
¶
Optional list of column names, specifying the columns to return. These names they should match cells in header_row
.
header_row: int
pydantic-field
¶
Row number with the headers. Defaults to 1
if header is given, otherwise None
.
new_header: List[str]
pydantic-field
¶
Optional list of new column names replacing header
in the output.
row_from: int
pydantic-field
¶
Excel row number of first row. Defaults to first assigned row.
row_to: int
pydantic-field
¶
Excel row number of last row. Defaults to last assigned row.
worksheet: str
pydantic-field
required
¶
Name of worksheet to load.
XLSXParseStrategy
dataclass
¶
Parse strategy for Excel XLSX files.
Registers strategies:
("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
Source code in oteapi/strategies/parse/excel_xlsx.py
@dataclass
@StrategyFactory.register(
("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
)
class XLSXParseStrategy:
"""Parse strategy for Excel XLSX files.
**Registers strategies**:
- `("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")`
"""
resource_config: "ResourceConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Parses selected region of an excel file.
Returns:
A dict with column-name/column-value pairs. The values are lists.
"""
model = XLSXParseDataModel(
**self.resource_config.configuration, extra=Extra.ignore
)
downloader = create_download_strategy(self.resource_config)
output = downloader.get()
cache = DataCache(self.resource_config.configuration)
with cache.getfile(key=output["key"], suffix=".xlsx") as filename:
workbook = load_workbook(filename=filename, read_only=True, data_only=True)
worksheet = workbook[model.worksheet]
set_model_defaults(model, worksheet)
columns = get_column_indices(model, worksheet)
data = []
for row in worksheet.iter_rows(
min_row=model.row_from,
max_row=model.row_to,
min_col=min(columns),
max_col=max(columns),
):
data.append([row[c - 1].value for c in columns])
if model.header_row:
row = worksheet.iter_rows(
min_row=model.header_row,
max_row=model.header_row,
min_col=min(columns),
max_col=max(columns),
).__next__()
header = [row[c - 1].value for c in columns]
else:
header = None
if model.new_header:
nhead = len(header) if header else len(data[0]) if data else 0
if len(model.new_header) != nhead:
raise TypeError(
f"length of `new_header` (={len(model.new_header)}) "
f"doesn't match number of columns (={len(header) if header else 0})"
)
if header:
for i, val in enumerate(model.new_header):
if val is not None:
header[i] = val
elif data:
header = model.new_header
if header is None:
header = [get_column_letter(col + 1) for col in range(len(data))]
transposed = list(map(list, zip(*data)))
return {k: v for k, v in zip(header, transposed)}
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/excel_xlsx.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
parse(self, session=None)
¶
Parses selected region of an excel file.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dict with column-name/column-value pairs. The values are lists. |
Source code in oteapi/strategies/parse/excel_xlsx.py
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Parses selected region of an excel file.
Returns:
A dict with column-name/column-value pairs. The values are lists.
"""
model = XLSXParseDataModel(
**self.resource_config.configuration, extra=Extra.ignore
)
downloader = create_download_strategy(self.resource_config)
output = downloader.get()
cache = DataCache(self.resource_config.configuration)
with cache.getfile(key=output["key"], suffix=".xlsx") as filename:
workbook = load_workbook(filename=filename, read_only=True, data_only=True)
worksheet = workbook[model.worksheet]
set_model_defaults(model, worksheet)
columns = get_column_indices(model, worksheet)
data = []
for row in worksheet.iter_rows(
min_row=model.row_from,
max_row=model.row_to,
min_col=min(columns),
max_col=max(columns),
):
data.append([row[c - 1].value for c in columns])
if model.header_row:
row = worksheet.iter_rows(
min_row=model.header_row,
max_row=model.header_row,
min_col=min(columns),
max_col=max(columns),
).__next__()
header = [row[c - 1].value for c in columns]
else:
header = None
if model.new_header:
nhead = len(header) if header else len(data[0]) if data else 0
if len(model.new_header) != nhead:
raise TypeError(
f"length of `new_header` (={len(model.new_header)}) "
f"doesn't match number of columns (={len(header) if header else 0})"
)
if header:
for i, val in enumerate(model.new_header):
if val is not None:
header[i] = val
elif data:
header = model.new_header
if header is None:
header = [get_column_letter(col + 1) for col in range(len(data))]
transposed = list(map(list, zip(*data)))
return {k: v for k, v in zip(header, transposed)}
get_column_indices(model, worksheet)
¶
Helper function returning a list of column indices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
XLSXParseDataModel |
The parsed data model. |
required |
worksheet |
Worksheet |
Excel worksheet, from which the header values will be retrieved. |
required |
Returns:
Type | Description |
---|---|
Iterable[int] |
A list of column indices. |
Source code in oteapi/strategies/parse/excel_xlsx.py
def get_column_indices(
model: XLSXParseDataModel, worksheet: "Worksheet"
) -> "Iterable[int]":
"""Helper function returning a list of column indices.
Parameters:
model: The parsed data model.
worksheet: Excel worksheet, from which the header values will be retrieved.
Returns:
A list of column indices.
"""
if not isinstance(model.col_from, int) or not isinstance(model.col_to, int):
raise TypeError("Expected `model.col_from` and `model.col_to` to be integers.")
if model.header:
header_dict = {
worksheet.cell(model.header_row, col).value: col
for col in range(model.col_from, model.col_to + 1)
}
return [header_dict[h] for h in model.header]
return range(model.col_from, model.col_to + 1)
set_model_defaults(model, worksheet)
¶
Update data model model
with default values obtained from worksheet
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
XLSXParseDataModel |
The parsed data model. |
required |
worksheet |
Worksheet |
Excel worksheet, from which the default values will be obtained. |
required |
Source code in oteapi/strategies/parse/excel_xlsx.py
def set_model_defaults(model: XLSXParseDataModel, worksheet: "Worksheet") -> None:
"""Update data model `model` with default values obtained from `worksheet`.
Parameters:
model: The parsed data model.
worksheet: Excel worksheet, from which the default values will be obtained.
"""
if model.row_from is None:
if model.header:
# assume that data starts on the first row after the header
model.row_from = model.header_row + 1 if model.header_row else 1
else:
model.row_from = worksheet.min_row
if model.row_to is None:
model.row_to = worksheet.max_row
if model.col_from is None:
model.col_from = worksheet.min_column
elif isinstance(model.col_from, str):
model.col_from = column_index_from_string(model.col_from)
if model.col_to is None:
model.col_to = worksheet.max_column
elif isinstance(model.col_to, str):
model.col_to = column_index_from_string(model.col_to)
if model.header and not model.header_row:
model.header_row = 1
image_jpeg
¶
Strategy class for image/jpg.
ImageDataParseStrategy
dataclass
¶
Parse strategy for images.
Registers strategies:
("mediaType", "image/jpg")
("mediaType", "image/jpeg")
("mediaType", "image/j2p")
("mediaType", "image/png")
("mediaType", "image/gif")
("mediaType", "image/tiff")
("mediaType", "image/eps")
Source code in oteapi/strategies/parse/image_jpeg.py
@dataclass
@StrategyFactory.register(
("mediaType", "image/jpg"),
("mediaType", "image/jpeg"),
("mediaType", "image/j2p"),
("mediaType", "image/png"),
("mediaType", "image/gif"),
("mediaType", "image/tiff"),
("mediaType", "image/eps"),
)
class ImageDataParseStrategy:
"""Parse strategy for images.
**Registers strategies**:
- `("mediaType", "image/jpg")`
- `("mediaType", "image/jpeg")`
- `("mediaType", "image/j2p")`
- `("mediaType", "image/png")`
- `("mediaType", "image/gif")`
- `("mediaType", "image/tiff")`
- `("mediaType", "image/eps")`
"""
resource_config: "ResourceConfig"
def __post_init__(self):
self.localpath = "/ote-data"
self.filename = self.resource_config.configuration["artifactName"]
if self.resource_config.configuration:
self.conf = self.resource_config.configuration
else:
self.conf = {}
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
if session is not None:
self.conf.update(session)
parsedOutput = {}
if "crop" in self.conf:
print("cropping!")
im = Image.open(f"{self.localpath}/{self.filename}")
crop = self.conf["crop"]
im_cropped = im.crop(tuple(crop))
cropped_filename = f"{self.localpath}/cropped_{self.filename}"
im_cropped.save(cropped_filename)
parsedOutput["cropped_filename"] = cropped_filename
parsedOutput["parseImage"] = "Done"
return parsedOutput
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/image_jpeg.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
text_csv
¶
Strategy class for text/csv.
CSVParseStrategy
dataclass
¶
Parse strategy for CSV files.
Registers strategies:
("mediaType", "text/csv")
Source code in oteapi/strategies/parse/text_csv.py
@dataclass
@StrategyFactory.register(("mediaType", "text/csv"))
class CSVParseStrategy:
"""Parse strategy for CSV files.
**Registers strategies**:
- `("mediaType", "text/csv")`
"""
resource_config: "ResourceConfig"
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Parse CSV."""
print("CSV in action!")
return {}
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/text_csv.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
parse(self, session=None)
¶
Parse CSV.
Source code in oteapi/strategies/parse/text_csv.py
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Parse CSV."""
print("CSV in action!")
return {}
text_json
¶
Strategy class for text/json.
JSONDataParseStrategy
dataclass
¶
Parse strategy for JSON.
Registers strategies:
("mediaType", "text/json")
Source code in oteapi/strategies/parse/text_json.py
@dataclass
@StrategyFactory.register(("mediaType", "text/json"))
class JSONDataParseStrategy:
"""Parse strategy for JSON.
**Registers strategies**:
- `("mediaType", "text/json")`
"""
resource_config: "ResourceConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Parse json."""
downloader = create_download_strategy(self.resource_config)
output = downloader.get()
cache = DataCache(self.resource_config.configuration)
content = cache.get(output["key"])
if isinstance(content, dict):
return content
return json.loads(content)
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/text_json.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
parse(self, session=None)
¶
Parse json.
Source code in oteapi/strategies/parse/text_json.py
def parse(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Parse json."""
downloader = create_download_strategy(self.resource_config)
output = downloader.get()
cache = DataCache(self.resource_config.configuration)
content = cache.get(output["key"])
if isinstance(content, dict):
return content
return json.loads(content)
transformation
special
¶
celery_remote
¶
Transformation Plugin that uses the Celery framework to call remote workers.
CeleryConfig (BaseModel)
pydantic-model
¶
Celery configuration.
Source code in oteapi/strategies/transformation/celery_remote.py
class CeleryConfig(BaseModel):
"""Celery configuration."""
taskName: str = Field(..., description="A task name.")
args: List[Any] = Field(..., description="List of arguments for the task.")
CeleryRemoteStrategy
dataclass
¶
Submit job to remote Celery runner.
Registers strategies:
("transformation_type", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
@dataclass
@StrategyFactory.register(("transformation_type", "celery/remote"))
class CeleryRemoteStrategy:
"""Submit job to remote Celery runner.
**Registers strategies**:
- `("transformation_type", "celery/remote")`
"""
transformation_config: "TransformationConfig"
def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Run a job, return a job ID."""
config = self.transformation_config.configuration
celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
return {"result": result.task_id}
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize a job."""
return {}
def status(self, task_id: str) -> TransformationStatus:
"""Get job status."""
result = AsyncResult(id=task_id, app=app)
return TransformationStatus(id=task_id, status=result.state)
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Get transformation."""
# TODO: update and return global state # pylint: disable=fixme
return {}
get(self, session=None)
¶
Get transformation.
Source code in oteapi/strategies/transformation/celery_remote.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Get transformation."""
# TODO: update and return global state # pylint: disable=fixme
return {}
initialize(self, session=None)
¶
Initialize a job.
Source code in oteapi/strategies/transformation/celery_remote.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize a job."""
return {}
run(self, session=None)
¶
Run a job, return a job ID.
Source code in oteapi/strategies/transformation/celery_remote.py
def run(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Run a job, return a job ID."""
config = self.transformation_config.configuration
celeryConfig = CeleryConfig() if config is None else CeleryConfig(**config)
result = app.send_task(celeryConfig.taskName, celeryConfig.args, kwargs=session)
return {"result": result.task_id}
status(self, task_id)
¶
Get job status.
Source code in oteapi/strategies/transformation/celery_remote.py
def status(self, task_id: str) -> TransformationStatus:
"""Get job status."""
result = AsyncResult(id=task_id, app=app)
return TransformationStatus(id=task_id, status=result.state)