OTE-API Core Strategies¶
This page provides documentation for the oteapi.strategies
submodule, where all the core OTE-API strategies are located.
These strategies will always be available when setting up a server based on the OTE-API Core package.
download
special
¶
file
¶
Download strategy class for the file
scheme.
FileConfig (BaseModel)
pydantic-model
¶
File-specific Configuration Data Model.
Source code in oteapi/strategies/download/file.py
class FileConfig(BaseModel):
"""File-specific Configuration Data Model."""
text: bool = Field(
False,
description=(
"Whether the file should be opened in text mode. If `False`, the file will"
" be opened in bytes mode."
),
)
encoding: Optional[str] = Field(
None,
description=(
"Encoding used when opening the file. The default is platform dependent."
),
)
FileStrategy
dataclass
¶
Strategy for retrieving data from a local file.
Registers strategies:
("scheme", "file")
Source code in oteapi/strategies/download/file.py
@dataclass
class FileStrategy:
"""Strategy for retrieving data from a local file.
**Registers strategies**:
- `("scheme", "file")`
"""
download_config: "ResourceConfig"
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateFile:
"""Read local file."""
if (
self.download_config.downloadUrl is None
or self.download_config.downloadUrl.scheme != "file"
):
raise ValueError(
"Expected 'downloadUrl' to have scheme 'file' in the configuration."
)
filename = Path(self.download_config.downloadUrl.path).resolve()
if isinstance(filename, PosixPath):
filename = Path("/" + self.download_config.downloadUrl.host + str(filename))
cache = DataCache(self.download_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
config = FileConfig(**self.download_config.configuration)
key = cache.add(
filename.read_text(encoding=config.encoding)
if config.text
else filename.read_bytes()
)
return SessionUpdateFile(key=key)
get(self, session=None)
¶
Read local file.
Source code in oteapi/strategies/download/file.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateFile:
"""Read local file."""
if (
self.download_config.downloadUrl is None
or self.download_config.downloadUrl.scheme != "file"
):
raise ValueError(
"Expected 'downloadUrl' to have scheme 'file' in the configuration."
)
filename = Path(self.download_config.downloadUrl.path).resolve()
if isinstance(filename, PosixPath):
filename = Path("/" + self.download_config.downloadUrl.host + str(filename))
cache = DataCache(self.download_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
config = FileConfig(**self.download_config.configuration)
key = cache.add(
filename.read_text(encoding=config.encoding)
if config.text
else filename.read_bytes()
)
return SessionUpdateFile(key=key)
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/download/file.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
SessionUpdateFile (SessionUpdate)
pydantic-model
¶
Class for returning values from Download File strategy.
Source code in oteapi/strategies/download/file.py
class SessionUpdateFile(SessionUpdate):
"""Class for returning values from Download File strategy."""
key: str = Field(..., description="Key to access the data in the cache.")
key: str
pydantic-field
required
¶
Key to access the data in the cache.
https
¶
Download strategy class for http/https
HTTPSStrategy
dataclass
¶
Strategy for retrieving data via http.
Registers strategies:
("scheme", "http")
("scheme", "https")
Source code in oteapi/strategies/download/https.py
@dataclass
class HTTPSStrategy:
"""Strategy for retrieving data via http.
**Registers strategies**:
- `("scheme", "http")`
- `("scheme", "https")`
"""
download_config: "ResourceConfig"
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateHTTPS:
"""Download via http/https and store on local cache."""
cache = DataCache(self.download_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
if not self.download_config.downloadUrl:
raise ValueError("downloadUrl not defined in configuration.")
req = requests.get(self.download_config.downloadUrl, allow_redirects=True)
key = cache.add(req.content)
return SessionUpdateHTTPS(key=key)
get(self, session=None)
¶
Download via http/https and store on local cache.
Source code in oteapi/strategies/download/https.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateHTTPS:
"""Download via http/https and store on local cache."""
cache = DataCache(self.download_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
if not self.download_config.downloadUrl:
raise ValueError("downloadUrl not defined in configuration.")
req = requests.get(self.download_config.downloadUrl, allow_redirects=True)
key = cache.add(req.content)
return SessionUpdateHTTPS(key=key)
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/download/https.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
SessionUpdateHTTPS (SessionUpdate)
pydantic-model
¶
Class for returning values from Download HTTPS strategy.
Source code in oteapi/strategies/download/https.py
class SessionUpdateHTTPS(SessionUpdate):
"""Class for returning values from Download HTTPS strategy."""
key: str = Field(..., description="Key to access the data in the cache.")
key: str
pydantic-field
required
¶
Key to access the data in the cache.
sftp
¶
Strategy class for sftp/ftp
SFTPStrategy
dataclass
¶
Strategy for retrieving data via sftp.
Registers strategies:
("scheme", "ftp")
("scheme", "sftp")
Source code in oteapi/strategies/download/sftp.py
@dataclass
class SFTPStrategy:
"""Strategy for retrieving data via sftp.
**Registers strategies**:
- `("scheme", "ftp")`
- `("scheme", "sftp")`
"""
download_config: "ResourceConfig"
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateSFTP:
"""Download via sftp"""
cache = DataCache(self.download_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
# Setup connection options
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
if not self.download_config.downloadUrl:
raise ValueError("downloadUrl is not defined in configuration.")
# open connection and store data locally
with pysftp.Connection(
host=self.download_config.downloadUrl.host,
username=self.download_config.downloadUrl.user,
password=self.download_config.downloadUrl.password,
port=self.download_config.downloadUrl.port,
cnopts=cnopts,
) as sftp:
# Because of insane locking on Windows, we have to close
# the downloaded file before adding it to the cache
with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
localpath = Path(handle.name).resolve()
try:
sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
key = cache.add(localpath.read_bytes())
finally:
localpath.unlink()
return SessionUpdateSFTP(key=key)
get(self, session=None)
¶
Download via sftp
Source code in oteapi/strategies/download/sftp.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateSFTP:
"""Download via sftp"""
cache = DataCache(self.download_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
# Setup connection options
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
if not self.download_config.downloadUrl:
raise ValueError("downloadUrl is not defined in configuration.")
# open connection and store data locally
with pysftp.Connection(
host=self.download_config.downloadUrl.host,
username=self.download_config.downloadUrl.user,
password=self.download_config.downloadUrl.password,
port=self.download_config.downloadUrl.port,
cnopts=cnopts,
) as sftp:
# Because of insane locking on Windows, we have to close
# the downloaded file before adding it to the cache
with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
localpath = Path(handle.name).resolve()
try:
sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
key = cache.add(localpath.read_bytes())
finally:
localpath.unlink()
return SessionUpdateSFTP(key=key)
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/download/sftp.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
SessionUpdateSFTP (SessionUpdate)
pydantic-model
¶
Class for returning values from Download SFTP strategy.
Source code in oteapi/strategies/download/sftp.py
class SessionUpdateSFTP(SessionUpdate):
"""Class for returning values from Download SFTP strategy."""
key: str = Field(..., description="Key to access the data in the cache.")
key: str
pydantic-field
required
¶
Key to access the data in the cache.
filter
special
¶
crop_filter
¶
Demo-filter strategy
CropFilter
dataclass
¶
Strategy for cropping an image.
Registers strategies:
("filterType", "filter/crop")
Source code in oteapi/strategies/filter/crop_filter.py
@dataclass
class CropFilter:
"""Strategy for cropping an image.
**Registers strategies**:
- `("filterType", "filter/crop")`
"""
filter_config: "FilterConfig"
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize strategy and return a dictionary."""
return SessionUpdate()
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCrop:
"""Execute strategy and return a dictionary"""
cropData = (
SessionUpdateCrop(**self.filter_config.configuration)
if self.filter_config.configuration
else SessionUpdateCrop()
)
return cropData
get(self, session=None)
¶
Execute strategy and return a dictionary
Source code in oteapi/strategies/filter/crop_filter.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCrop:
"""Execute strategy and return a dictionary"""
cropData = (
SessionUpdateCrop(**self.filter_config.configuration)
if self.filter_config.configuration
else SessionUpdateCrop()
)
return cropData
initialize(self, session=None)
¶
Initialize strategy and return a dictionary.
Source code in oteapi/strategies/filter/crop_filter.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize strategy and return a dictionary."""
return SessionUpdate()
SessionUpdateCrop (SessionUpdate)
pydantic-model
¶
Class for returning values from crop data.
Source code in oteapi/strategies/filter/crop_filter.py
class SessionUpdateCrop(SessionUpdate):
"""Class for returning values from crop data."""
crop: List[int] = Field(..., description="List of image cropping details.")
crop: List[int]
pydantic-field
required
¶
List of image cropping details.
sql_query_filter
¶
SQL query filter strategy.
SQLQueryFilter
dataclass
¶
Strategy for a SQL query filter.
Registers strategies:
("filterType", "filter/sql")
Source code in oteapi/strategies/filter/sql_query_filter.py
@dataclass
class SQLQueryFilter:
"""Strategy for a SQL query filter.
**Registers strategies**:
- `("filterType", "filter/sql")`
"""
filter_config: "FilterConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateSqlQuery:
"""Initialize strategy and return a dictionary"""
return SessionUpdateSqlQuery(**{"query": self.filter_config.query})
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Execute strategy and return a dictionary"""
return SessionUpdate()
get(self, session=None)
¶
Execute strategy and return a dictionary
Source code in oteapi/strategies/filter/sql_query_filter.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Execute strategy and return a dictionary"""
return SessionUpdate()
initialize(self, session=None)
¶
Initialize strategy and return a dictionary
Source code in oteapi/strategies/filter/sql_query_filter.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateSqlQuery:
"""Initialize strategy and return a dictionary"""
return SessionUpdateSqlQuery(**{"query": self.filter_config.query})
SessionUpdateSqlQuery (SessionUpdate)
pydantic-model
¶
Class for returning values from SQL Query data model.
Source code in oteapi/strategies/filter/sql_query_filter.py
class SessionUpdateSqlQuery(SessionUpdate):
"""Class for returning values from SQL Query data model."""
query: str = Field(..., description="A SQL query string.")
query: str
pydantic-field
required
¶
A SQL query string.
parse
special
¶
application_json
¶
Strategy class for application/json.
JSONDataParseStrategy
dataclass
¶
Parse strategy for JSON.
Registers strategies:
("mediaType", "application/json")
Source code in oteapi/strategies/parse/application_json.py
@dataclass
class JSONDataParseStrategy:
"""Parse strategy for JSON.
**Registers strategies**:
- `("mediaType", "application/json")`
"""
parse_config: "ResourceConfig"
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateJSONParse:
"""Parse json."""
downloader = create_strategy("download", self.parse_config)
output = downloader.get()
cache = DataCache(self.parse_config.configuration)
content = cache.get(output["key"])
if isinstance(content, dict):
return SessionUpdateJSONParse(content=content)
return SessionUpdateJSONParse(content=json.loads(content))
get(self, session=None)
¶
Parse json.
Source code in oteapi/strategies/parse/application_json.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateJSONParse:
"""Parse json."""
downloader = create_strategy("download", self.parse_config)
output = downloader.get()
cache = DataCache(self.parse_config.configuration)
content = cache.get(output["key"])
if isinstance(content, dict):
return SessionUpdateJSONParse(content=content)
return SessionUpdateJSONParse(content=json.loads(content))
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/application_json.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
SessionUpdateJSONParse (SessionUpdate)
pydantic-model
¶
Class for returning values from JSON Parse.
Source code in oteapi/strategies/parse/application_json.py
class SessionUpdateJSONParse(SessionUpdate):
"""Class for returning values from JSON Parse."""
content: dict = Field(..., description="Content of the JSON document.")
content: dict
pydantic-field
required
¶
Content of the JSON document.
application_vnd_sqlite
¶
Strategy class for application/vnd.sqlite3.
SessionUpdateSqLiteParse (SessionUpdate)
pydantic-model
¶
Configuration model for SqLiteParse.
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
class SessionUpdateSqLiteParse(SessionUpdate):
"""Configuration model for SqLiteParse."""
result: Optional[list] = Field(None, description="List of results from the query.")
msg: str = Field(..., description="Messsage concerning the execution of the query.")
SqliteParseStrategy
dataclass
¶
Parse strategy for SQLite.
Registers strategies:
("mediaType", "application/vnd.sqlite3")
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
@dataclass
class SqliteParseStrategy:
"""Parse strategy for SQLite.
**Registers strategies**:
- `("mediaType", "application/vnd.sqlite3")`
"""
parse_config: "ResourceConfig"
def get(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateSqLiteParse:
"""Parse SQLite query responses."""
if session is None:
raise ValueError("Missing session")
if "sqlquery" in session:
cn = create_connection(session["filename"])
cur = cn.cursor()
rows = cur.execute(session["sqlquery"]).fetchall()
return SessionUpdateSqLiteParse(result=rows, msg="Query executed")
return SessionUpdateSqLiteParse(msg="No query given")
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
get(self, session=None)
¶
Parse SQLite query responses.
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def get(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateSqLiteParse:
"""Parse SQLite query responses."""
if session is None:
raise ValueError("Missing session")
if "sqlquery" in session:
cn = create_connection(session["filename"])
cur = cn.cursor()
rows = cur.execute(session["sqlquery"]).fetchall()
return SessionUpdateSqLiteParse(result=rows, msg="Query executed")
return SessionUpdateSqLiteParse(msg="No query given")
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
create_connection(db_file)
¶
create a database connection to the SQLite database specified by db_file :param db_file: database file :return: Connection object or None
Source code in oteapi/strategies/parse/application_vnd_sqlite.py
def create_connection(db_file):
"""create a database connection to the SQLite database
specified by db_file
:param db_file: database file
:return: Connection object or None
"""
conn = None
try:
conn = sqlite3.connect(db_file)
return conn
except sqlite3.Error as exc:
print(exc)
return conn
excel_xlsx
¶
Strategy class for workbook/xlsx.
SessionUpdateXLSXParse (SessionUpdate)
pydantic-model
¶
Class for returning values from XLSXParse.
Source code in oteapi/strategies/parse/excel_xlsx.py
class SessionUpdateXLSXParse(SessionUpdate):
"""Class for returning values from XLSXParse."""
data: Dict[str, list] = Field(
...,
description="A dict with column-name/column-value pairs. The values are lists.",
)
data: Dict[str, list]
pydantic-field
required
¶
A dict with column-name/column-value pairs. The values are lists.
XLSXParseDataModel (BaseModel)
pydantic-model
¶
Data model for retrieving a rectangular section of an Excel sheet.
Source code in oteapi/strategies/parse/excel_xlsx.py
class XLSXParseDataModel(BaseModel):
"""Data model for retrieving a rectangular section of an Excel sheet."""
worksheet: str = Field(..., description="Name of worksheet to load.")
row_from: Optional[int] = Field(
None,
description="Excel row number of first row. Defaults to first assigned row.",
)
col_from: Optional[Union[int, str]] = Field(
None,
description=(
"Excel column number or label of first column. Defaults to first assigned "
"column."
),
)
row_to: Optional[int] = Field(
None, description="Excel row number of last row. Defaults to last assigned row."
)
col_to: Optional[Union[int, str]] = Field(
None,
description=(
"Excel column number or label of last column. Defaults to last assigned "
"column."
),
)
header_row: Optional[int] = Field(
None,
description=(
"Row number with the headers. Defaults to `1` if header is given, "
"otherwise `None`."
),
)
header: Optional[List[str]] = Field(
None,
description=(
"Optional list of column names, specifying the columns to return. "
"These names they should match cells in `header_row`."
),
)
new_header: Optional[List[str]] = Field(
None,
description=(
"Optional list of new column names replacing `header` in the output."
),
)
download_config: AttrDict = Field(
AttrDict(),
description="Configurations provided to a download strategy.",
)
col_from: Union[int, str]
pydantic-field
¶
Excel column number or label of first column. Defaults to first assigned column.
col_to: Union[int, str]
pydantic-field
¶
Excel column number or label of last column. Defaults to last assigned column.
download_config: AttrDict
pydantic-field
¶
Configurations provided to a download strategy.
header: List[str]
pydantic-field
¶
Optional list of column names, specifying the columns to return. These names they should match cells in header_row
.
header_row: int
pydantic-field
¶
Row number with the headers. Defaults to 1
if header is given, otherwise None
.
new_header: List[str]
pydantic-field
¶
Optional list of new column names replacing header
in the output.
row_from: int
pydantic-field
¶
Excel row number of first row. Defaults to first assigned row.
row_to: int
pydantic-field
¶
Excel row number of last row. Defaults to last assigned row.
worksheet: str
pydantic-field
required
¶
Name of worksheet to load.
XLSXParseStrategy
dataclass
¶
Parse strategy for Excel XLSX files.
Registers strategies:
("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
Source code in oteapi/strategies/parse/excel_xlsx.py
@dataclass
class XLSXParseStrategy:
"""Parse strategy for Excel XLSX files.
**Registers strategies**:
- `("mediaType", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")`
"""
parse_config: "ResourceConfig"
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateXLSXParse:
"""Parses selected region of an excel file.
Returns:
A dict with column-name/column-value pairs. The values are lists.
"""
model = XLSXParseDataModel(**self.parse_config.configuration)
download_config = self.parse_config.copy()
download_config.configuration = model.download_config
downloader = create_strategy("download", download_config)
output = downloader.get()
cache = DataCache(self.parse_config.configuration)
with cache.getfile(key=output["key"], suffix=".xlsx") as filename:
workbook = load_workbook(filename=filename, read_only=True, data_only=True)
worksheet = workbook[model.worksheet]
set_model_defaults(model, worksheet)
columns = get_column_indices(model, worksheet)
data = []
for row in worksheet.iter_rows(
min_row=model.row_from,
max_row=model.row_to,
min_col=min(columns),
max_col=max(columns),
):
data.append([row[c - 1].value for c in columns])
if model.header_row:
row = worksheet.iter_rows(
min_row=model.header_row,
max_row=model.header_row,
min_col=min(columns),
max_col=max(columns),
).__next__()
header = [row[c - 1].value for c in columns]
else:
header = None
if model.new_header:
nhead = len(header) if header else len(data[0]) if data else 0
if len(model.new_header) != nhead:
raise TypeError(
f"length of `new_header` (={len(model.new_header)}) "
f"doesn't match number of columns (={len(header) if header else 0})"
)
if header:
for i, val in enumerate(model.new_header):
if val is not None:
header[i] = val
elif data:
header = model.new_header
if header is None:
header = [get_column_letter(col + 1) for col in range(len(data))]
transposed = [list(datum) for datum in zip(*data)]
return SessionUpdateXLSXParse(
data={key: value for key, value in zip(header, transposed)}
)
get(self, session=None)
¶
Parses selected region of an excel file.
Returns:
Type | Description |
---|---|
SessionUpdateXLSXParse |
A dict with column-name/column-value pairs. The values are lists. |
Source code in oteapi/strategies/parse/excel_xlsx.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateXLSXParse:
"""Parses selected region of an excel file.
Returns:
A dict with column-name/column-value pairs. The values are lists.
"""
model = XLSXParseDataModel(**self.parse_config.configuration)
download_config = self.parse_config.copy()
download_config.configuration = model.download_config
downloader = create_strategy("download", download_config)
output = downloader.get()
cache = DataCache(self.parse_config.configuration)
with cache.getfile(key=output["key"], suffix=".xlsx") as filename:
workbook = load_workbook(filename=filename, read_only=True, data_only=True)
worksheet = workbook[model.worksheet]
set_model_defaults(model, worksheet)
columns = get_column_indices(model, worksheet)
data = []
for row in worksheet.iter_rows(
min_row=model.row_from,
max_row=model.row_to,
min_col=min(columns),
max_col=max(columns),
):
data.append([row[c - 1].value for c in columns])
if model.header_row:
row = worksheet.iter_rows(
min_row=model.header_row,
max_row=model.header_row,
min_col=min(columns),
max_col=max(columns),
).__next__()
header = [row[c - 1].value for c in columns]
else:
header = None
if model.new_header:
nhead = len(header) if header else len(data[0]) if data else 0
if len(model.new_header) != nhead:
raise TypeError(
f"length of `new_header` (={len(model.new_header)}) "
f"doesn't match number of columns (={len(header) if header else 0})"
)
if header:
for i, val in enumerate(model.new_header):
if val is not None:
header[i] = val
elif data:
header = model.new_header
if header is None:
header = [get_column_letter(col + 1) for col in range(len(data))]
transposed = [list(datum) for datum in zip(*data)]
return SessionUpdateXLSXParse(
data={key: value for key, value in zip(header, transposed)}
)
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/excel_xlsx.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
get_column_indices(model, worksheet)
¶
Helper function returning a list of column indices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
XLSXParseDataModel |
The parsed data model. |
required |
worksheet |
Worksheet |
Excel worksheet, from which the header values will be retrieved. |
required |
Returns:
Type | Description |
---|---|
Iterable[int] |
A list of column indices. |
Source code in oteapi/strategies/parse/excel_xlsx.py
def get_column_indices(
model: XLSXParseDataModel, worksheet: "Worksheet"
) -> "Iterable[int]":
"""Helper function returning a list of column indices.
Parameters:
model: The parsed data model.
worksheet: Excel worksheet, from which the header values will be retrieved.
Returns:
A list of column indices.
"""
if not isinstance(model.col_from, int) or not isinstance(model.col_to, int):
raise TypeError("Expected `model.col_from` and `model.col_to` to be integers.")
if model.header:
header_dict = {
worksheet.cell(model.header_row, col).value: col
for col in range(model.col_from, model.col_to + 1)
}
return [header_dict[h] for h in model.header]
return range(model.col_from, model.col_to + 1)
set_model_defaults(model, worksheet)
¶
Update data model model
with default values obtained from worksheet
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
XLSXParseDataModel |
The parsed data model. |
required |
worksheet |
Worksheet |
Excel worksheet, from which the default values will be obtained. |
required |
Source code in oteapi/strategies/parse/excel_xlsx.py
def set_model_defaults(model: XLSXParseDataModel, worksheet: "Worksheet") -> None:
"""Update data model `model` with default values obtained from `worksheet`.
Parameters:
model: The parsed data model.
worksheet: Excel worksheet, from which the default values will be obtained.
"""
if model.row_from is None:
if model.header:
# assume that data starts on the first row after the header
model.row_from = model.header_row + 1 if model.header_row else 1
else:
model.row_from = worksheet.min_row
if model.row_to is None:
model.row_to = worksheet.max_row
if model.col_from is None:
model.col_from = worksheet.min_column
elif isinstance(model.col_from, str):
model.col_from = column_index_from_string(model.col_from)
if model.col_to is None:
model.col_to = worksheet.max_column
elif isinstance(model.col_to, str):
model.col_to = column_index_from_string(model.col_to)
if model.header and not model.header_row:
model.header_row = 1
image
¶
Strategy class for image/jpg.
ImageDataParseStrategy
dataclass
¶
Parse strategy for images.
Registers strategies:
("mediaType", "image/jpg")
("mediaType", "image/jpeg")
("mediaType", "image/jp2")
("mediaType", "image/png")
("mediaType", "image/gif")
("mediaType", "image/tiff")
("mediaType", "image/eps")
Source code in oteapi/strategies/parse/image.py
@dataclass
class ImageDataParseStrategy:
"""Parse strategy for images.
**Registers strategies**:
- `("mediaType", "image/jpg")`
- `("mediaType", "image/jpeg")`
- `("mediaType", "image/jp2")`
- `("mediaType", "image/png")`
- `("mediaType", "image/gif")`
- `("mediaType", "image/tiff")`
- `("mediaType", "image/eps")`
"""
parse_config: "ResourceConfig"
def __post_init__(self):
self.localpath = "/ote-data"
self.filename = self.parse_config.configuration["filename"]
self.conf = self.parse_config.configuration
if "localpath" in self.conf:
self.localpath = self.conf["localpath"]
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
def get(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateImageParse:
if session is not None:
self.conf.update(session)
parsedOutput = {}
if "crop" in self.conf:
print("cropping!")
im = Image.open(f"{self.localpath}/{self.filename}")
crop = self.conf["crop"]
im_cropped = im.crop(tuple(crop))
cropped_filename = f"{self.localpath}/cropped_{self.filename}"
im_cropped.save(cropped_filename)
parsedOutput["cropped_filename"] = cropped_filename
parsedOutput["parseImage"] = "Done"
return SessionUpdateImageParse(parsedOutput=parsedOutput)
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/image.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
SessionUpdateImageParse (SessionUpdate)
pydantic-model
¶
Configuration model for ImageParse.
Source code in oteapi/strategies/parse/image.py
class SessionUpdateImageParse(SessionUpdate):
"""Configuration model for ImageParse."""
parsedOutput: Dict[str, str] = Field(
..., description="Parsed output from ImageParse."
)
parsedOutput: Dict[str, str]
pydantic-field
required
¶
Parsed output from ImageParse.
text_csv
¶
Strategy class for text/csv.
CSVParseStrategy
dataclass
¶
Parse strategy for CSV files.
Registers strategies:
("mediaType", "text/csv")
Source code in oteapi/strategies/parse/text_csv.py
@dataclass
class CSVParseStrategy:
"""Parse strategy for CSV files.
**Registers strategies**:
- `("mediaType", "text/csv")`
"""
parse_config: "ResourceConfig"
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Parse CSV."""
return SessionUpdate()
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
get(self, session=None)
¶
Parse CSV.
Source code in oteapi/strategies/parse/text_csv.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Parse CSV."""
return SessionUpdate()
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/parse/text_csv.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()
transformation
special
¶
celery_remote
¶
Transformation Plugin that uses the Celery framework to call remote workers.
CeleryConfig (BaseModel)
pydantic-model
¶
Celery configuration.
Source code in oteapi/strategies/transformation/celery_remote.py
class CeleryConfig(BaseModel):
"""Celery configuration."""
task_name: str = Field(..., description="A task name.")
args: list = Field(..., description="List of arguments for the task.")
CeleryRemoteStrategy
dataclass
¶
Submit job to remote Celery runner.
Registers strategies:
("transformationType", "celery/remote")
Source code in oteapi/strategies/transformation/celery_remote.py
@dataclass
class CeleryRemoteStrategy:
"""Submit job to remote Celery runner.
**Registers strategies**:
- `("transformationType", "celery/remote")`
"""
transformation_config: "TransformationConfig"
def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
"""Run a job, return a job ID."""
config = self.transformation_config.configuration
celery_config = CeleryConfig() if config is None else CeleryConfig(**config)
result: "Union[AsyncResult, Any]" = app.send_task(
celery_config.task_name, celery_config.args, kwargs=session
)
status = AsyncResult(id=result.task_id, app=app)
return TransformationStatus(id=result.task_id, status=status.status)
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize a job."""
return SessionUpdate()
def status(self, task_id: str) -> TransformationStatus:
"""Get job status."""
result = AsyncResult(id=task_id, app=app)
return TransformationStatus(id=task_id, status=result.state)
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "SessionUpdateCelery":
"""Get transformation."""
# TODO: update and return global state # pylint: disable=fixme
return SessionUpdateCelery(data={})
get(self, session=None)
¶
Get transformation.
Source code in oteapi/strategies/transformation/celery_remote.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "SessionUpdateCelery":
"""Get transformation."""
# TODO: update and return global state # pylint: disable=fixme
return SessionUpdateCelery(data={})
initialize(self, session=None)
¶
Initialize a job.
Source code in oteapi/strategies/transformation/celery_remote.py
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
"""Initialize a job."""
return SessionUpdate()
run(self, session=None)
¶
Run a job, return a job ID.
Source code in oteapi/strategies/transformation/celery_remote.py
def run(self, session: "Optional[Dict[str, Any]]" = None) -> TransformationStatus:
"""Run a job, return a job ID."""
config = self.transformation_config.configuration
celery_config = CeleryConfig() if config is None else CeleryConfig(**config)
result: "Union[AsyncResult, Any]" = app.send_task(
celery_config.task_name, celery_config.args, kwargs=session
)
status = AsyncResult(id=result.task_id, app=app)
return TransformationStatus(id=result.task_id, status=status.status)
status(self, task_id)
¶
Get job status.
Source code in oteapi/strategies/transformation/celery_remote.py
def status(self, task_id: str) -> TransformationStatus:
"""Get job status."""
result = AsyncResult(id=task_id, app=app)
return TransformationStatus(id=task_id, status=result.state)
SessionUpdateCelery (SessionUpdate)
pydantic-model
¶
Class for returning values from XLSXParse.
Source code in oteapi/strategies/transformation/celery_remote.py
class SessionUpdateCelery(SessionUpdate):
"""Class for returning values from XLSXParse."""
data: Dict[str, list] = Field(
...,
description="A dict with column-name/column-value pairs. The values are lists.",
)
data: Dict[str, list]
pydantic-field
required
¶
A dict with column-name/column-value pairs. The values are lists.