Skip to content

https

Download strategy class for http/https

HTTPSConfig

Bases: AttrDict

HTTP(S)-specific Configuration Data Model.

Source code in oteapi/strategies/download/https.py
16
17
18
19
20
21
22
class HTTPSConfig(AttrDict):
    """HTTP(S)-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for storing the downloaded file content.",
    )

HTTPSResourceConfig

Bases: ResourceConfig

HTTP(S) download strategy filter config.

Source code in oteapi/strategies/download/https.py
25
26
27
28
29
30
31
32
33
class HTTPSResourceConfig(ResourceConfig):
    """HTTP(S) download strategy filter config."""

    downloadUrl: AnyHttpUrl = Field(  # type: ignore[assignment]
        ..., description="The HTTP(S) URL, which will be downloaded."
    )
    configuration: HTTPSConfig = Field(
        HTTPSConfig(), description="HTTP(S) download strategy-specific configuration."
    )

HTTPSStrategy

Strategy for retrieving data via http.

Registers strategies:

  • ("scheme", "http")
  • ("scheme", "https")
Source code in oteapi/strategies/download/https.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@dataclass
class HTTPSStrategy:
    """Strategy for retrieving data via http.

    **Registers strategies**:

    - `("scheme", "http")`
    - `("scheme", "https")`

    """

    download_config: HTTPSResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateHTTPS:
        """Download via http/https and store on local cache."""
        cache = DataCache(self.download_config.configuration.datacache_config)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            req = requests.get(
                self.download_config.downloadUrl,
                allow_redirects=True,
                timeout=(3, 27),  # timeout: (connect, read) in seconds
            )
            key = cache.add(req.content)

        return SessionUpdateHTTPS(key=key)

get(session=None)

Download via http/https and store on local cache.

Source code in oteapi/strategies/download/https.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateHTTPS:
    """Download via http/https and store on local cache."""
    cache = DataCache(self.download_config.configuration.datacache_config)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        req = requests.get(
            self.download_config.downloadUrl,
            allow_redirects=True,
            timeout=(3, 27),  # timeout: (connect, read) in seconds
        )
        key = cache.add(req.content)

    return SessionUpdateHTTPS(key=key)

initialize(session=None)

Initialize.

Source code in oteapi/strategies/download/https.py
55
56
57
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateHTTPS

Bases: SessionUpdate

Class for returning values from Download HTTPS strategy.

Source code in oteapi/strategies/download/https.py
36
37
38
39
class SessionUpdateHTTPS(SessionUpdate):
    """Class for returning values from Download HTTPS strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")