file¶
Download strategy class for the file
scheme.
FileConfig (BaseModel)
pydantic-model
¶
File-specific Configuration Data Model.
Source code in oteapi/strategies/download/file.py
class FileConfig(BaseModel):
"""File-specific Configuration Data Model."""
text: bool = Field(
False,
description=(
"Whether the file should be opened in text mode. If `False`, the file will"
" be opened in bytes mode."
),
)
encoding: Optional[str] = Field(
None,
description=(
"Encoding used when opening the file. The default is platform dependent."
),
)
FileStrategy
dataclass
¶
Strategy for retrieving data from a local file.
Registers strategies:
("scheme", "file")
Source code in oteapi/strategies/download/file.py
@dataclass
@StrategyFactory.register(("scheme", "file"))
class FileStrategy:
"""Strategy for retrieving data from a local file.
**Registers strategies**:
- `("scheme", "file")`
"""
resource_config: "ResourceConfig"
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Read local file."""
if (
self.resource_config.downloadUrl is None
or self.resource_config.downloadUrl.scheme != "file"
):
raise ValueError(
"Expected 'downloadUrl' to have scheme 'file' in the configuration."
)
filename = Path(self.resource_config.downloadUrl.host).resolve()
cache = DataCache(self.resource_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
config = FileConfig(
**self.resource_config.configuration, extra=Extra.ignore
)
key = cache.add(
filename.read_text(encoding=config.encoding)
if config.text
else filename.read_bytes()
)
return {"key": key}
get(self, session=None)
¶
Read local file.
Source code in oteapi/strategies/download/file.py
def get(self, session: "Optional[Dict[str, Any]]" = None) -> "Dict[str, Any]":
"""Read local file."""
if (
self.resource_config.downloadUrl is None
or self.resource_config.downloadUrl.scheme != "file"
):
raise ValueError(
"Expected 'downloadUrl' to have scheme 'file' in the configuration."
)
filename = Path(self.resource_config.downloadUrl.host).resolve()
cache = DataCache(self.resource_config.configuration)
if cache.config.accessKey and cache.config.accessKey in cache:
key = cache.config.accessKey
else:
config = FileConfig(
**self.resource_config.configuration, extra=Extra.ignore
)
key = cache.add(
filename.read_text(encoding=config.encoding)
if config.text
else filename.read_bytes()
)
return {"key": key}
initialize(self, session=None)
¶
Initialize.
Source code in oteapi/strategies/download/file.py
def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> "Dict[str, Any]":
"""Initialize."""
return {}