Skip to content

sftp

Strategy class for sftp/ftp

AnyFtpUrl

Bases: AnyUrl

A (S)FTP URL model.

Source code in oteapi/strategies/download/sftp.py
18
19
20
21
class AnyFtpUrl(AnyUrl):
    """A (S)FTP URL model."""

    allowed_schemes = {"ftp", "sftp"}

SFTPConfig

Bases: AttrDict

(S)FTP-specific Configuration Data Model.

Source code in oteapi/strategies/download/sftp.py
24
25
26
27
28
29
30
class SFTPConfig(AttrDict):
    """(S)FTP-specific Configuration Data Model."""

    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configurations for the data cache for storing the downloaded file content.",
    )

SFTPResourceConfig

Bases: ResourceConfig

(S)FTP download strategy filter config.

Source code in oteapi/strategies/download/sftp.py
33
34
35
36
37
38
39
40
41
class SFTPResourceConfig(ResourceConfig):
    """(S)FTP download strategy filter config."""

    downloadUrl: AnyFtpUrl = Field(  # type: ignore[assignment]
        ..., description="The (S)FTP URL, which will be downloaded."
    )
    configuration: SFTPConfig = Field(
        SFTPConfig(), description="(S)FTP download strategy-specific configuration."
    )

SFTPStrategy

Strategy for retrieving data via sftp.

Registers strategies:

  • ("scheme", "ftp")
  • ("scheme", "sftp")
Source code in oteapi/strategies/download/sftp.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclass
class SFTPStrategy:
    """Strategy for retrieving data via sftp.

    **Registers strategies**:

    - `("scheme", "ftp")`
    - `("scheme", "sftp")`

    """

    download_config: SFTPResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize."""
        return SessionUpdate()

    def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateSFTP:
        """Download via sftp"""
        cache = DataCache(self.download_config.configuration.datacache_config)
        if cache.config.accessKey and cache.config.accessKey in cache:
            key = cache.config.accessKey
        else:
            # Setup connection options
            cnopts = pysftp.CnOpts()
            cnopts.hostkeys = None

            # open connection and store data locally
            with pysftp.Connection(
                host=self.download_config.downloadUrl.host,
                username=self.download_config.downloadUrl.user,
                password=self.download_config.downloadUrl.password,
                port=self.download_config.downloadUrl.port,
                cnopts=cnopts,
            ) as sftp:
                # Because of insane locking on Windows, we have to close
                # the downloaded file before adding it to the cache
                with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                    localpath = Path(handle.name).resolve()
                try:
                    sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
                    key = cache.add(localpath.read_bytes())
                finally:
                    localpath.unlink()

        return SessionUpdateSFTP(key=key)

get(session=None)

Download via sftp

Source code in oteapi/strategies/download/sftp.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateSFTP:
    """Download via sftp"""
    cache = DataCache(self.download_config.configuration.datacache_config)
    if cache.config.accessKey and cache.config.accessKey in cache:
        key = cache.config.accessKey
    else:
        # Setup connection options
        cnopts = pysftp.CnOpts()
        cnopts.hostkeys = None

        # open connection and store data locally
        with pysftp.Connection(
            host=self.download_config.downloadUrl.host,
            username=self.download_config.downloadUrl.user,
            password=self.download_config.downloadUrl.password,
            port=self.download_config.downloadUrl.port,
            cnopts=cnopts,
        ) as sftp:
            # Because of insane locking on Windows, we have to close
            # the downloaded file before adding it to the cache
            with NamedTemporaryFile(prefix="oteapi-sftp-", delete=False) as handle:
                localpath = Path(handle.name).resolve()
            try:
                sftp.get(self.download_config.downloadUrl.path, localpath=localpath)
                key = cache.add(localpath.read_bytes())
            finally:
                localpath.unlink()

    return SessionUpdateSFTP(key=key)

initialize(session=None)

Initialize.

Source code in oteapi/strategies/download/sftp.py
63
64
65
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize."""
    return SessionUpdate()

SessionUpdateSFTP

Bases: SessionUpdate

Class for returning values from Download SFTP strategy.

Source code in oteapi/strategies/download/sftp.py
44
45
46
47
class SessionUpdateSFTP(SessionUpdate):
    """Class for returning values from Download SFTP strategy."""

    key: str = Field(..., description="Key to access the data in the cache.")