Skip to content

application_json

Strategy class for application/json.

JSONConfig

Bases: AttrDict

JSON parse-specific Configuration Data Model.

Source code in oteapi/strategies/parse/application_json.py
17
18
19
20
21
22
23
class JSONConfig(AttrDict):
    """JSON parse-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for storing the downloaded file content.",
    )

JSONDataParseStrategy

Parse strategy for JSON.

Registers strategies:

  • ("mediaType", "application/json")
Source code in oteapi/strategies/parse/application_json.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@dataclass
class JSONDataParseStrategy:
    """Parse strategy for JSON.

    **Registers strategies**:

    - `("mediaType", "application/json")`

    """

    parse_config: JSONResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateJSONParse:
        """Parse json."""
        downloader = create_strategy("download", self.parse_config)
        output = downloader.get()
        cache = DataCache(self.parse_config.configuration.datacache_config)
        content = cache.get(output["key"])

        if isinstance(content, dict):
            return SessionUpdateJSONParse(content=content)
        return SessionUpdateJSONParse(content=json.loads(content))

get(session=None)

Parse json.

Source code in oteapi/strategies/parse/application_json.py
61
62
63
64
65
66
67
68
69
70
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateJSONParse:
    """Parse json."""
    downloader = create_strategy("download", self.parse_config)
    output = downloader.get()
    cache = DataCache(self.parse_config.configuration.datacache_config)
    content = cache.get(output["key"])

    if isinstance(content, dict):
        return SessionUpdateJSONParse(content=content)
    return SessionUpdateJSONParse(content=json.loads(content))

initialize(session=None)

Initialize.

Source code in oteapi/strategies/parse/application_json.py
57
58
59
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

JSONResourceConfig

Bases: ResourceConfig

JSON parse strategy filter config.

Source code in oteapi/strategies/parse/application_json.py
26
27
28
29
30
31
32
33
34
35
36
class JSONResourceConfig(ResourceConfig):
    """JSON parse strategy filter config."""

    mediaType: str = Field(
        "application/json",
        const=True,
        description=ResourceConfig.__fields__["mediaType"].field_info.description,
    )
    configuration: JSONConfig = Field(
        JSONConfig(), description="JSON parse strategy-specific configuration."
    )

SessionUpdateJSONParse

Bases: SessionUpdate

Class for returning values from JSON Parse.

Source code in oteapi/strategies/parse/application_json.py
39
40
41
42
class SessionUpdateJSONParse(SessionUpdate):
    """Class for returning values from JSON Parse."""

    content: dict = Field(..., description="Content of the JSON document.")