Skip to content

postgres

Strategy class for application/vnd.postgresql

PostgresConfig

Bases: AttrDict

Configuration data model for PostgresResourceStrategy.

Source code in oteapi/strategies/parse/postgres.py
13
14
15
16
17
18
19
20
21
22
class PostgresConfig(AttrDict):
    """Configuration data model for
    [`PostgresResourceStrategy`][oteapi.strategies.parse.postgres.PostgresResourceConfig].
    """

    user: Optional[str] = Field(None, description="postgres server username")
    dbname: Optional[str] = Field(None, description="postgres dbname name")
    password: Optional[str] = Field(None, description="postgres password")

    sqlquery: Optional[str] = Field("", description="A SQL query string.")

PostgresResourceConfig

Bases: ResourceConfig

Postgresql parse strategy config

Source code in oteapi/strategies/parse/postgres.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class PostgresResourceConfig(ResourceConfig):
    """Postgresql parse strategy config"""

    configuration: PostgresConfig = Field(
        PostgresConfig(),
        description=(
            "Configuration for resource. " "Values in the accessURL take precedence."
        ),
    )
    datacache_config: Optional[DataCacheConfig] = Field(
        None,
        description="Configuration options for the local data cache.",
    )

    @classmethod
    def _urlconstruct(
        cls,  # PEP8 - Always use cls for the first argument to class methods.
        scheme: Optional[str] = "",  # Schema defining link format
        user: Optional[str] = None,  # Username
        password: Optional[str] = None,  # Password
        host: Optional[str] = None,
        port: Optional[int] = None,
        path: Optional[str] = "",
        params: Optional[str] = "",
        query: Optional[str] = "",
        fragment: Optional[str] = "",
    ):
        """Construct a pydantic AnyUrl based on the given URL properties"""

        # Hostname should always be given
        if not host:
            raise ValueError("hostname must be specified")

        # Update netloc of username or username|password pair is defined
        netloc = host
        if user and not password:  # Only username is provided. OK
            netloc = f"{user}@{host}"
        elif user and password:  # Username and password is provided. OK
            netloc = f"{user}:{password}@{host}"
        else:  # Password and no username is provided. ERROR
            raise ValueError("username not provided")

        # Append port if port is defined
        netloc = netloc if not port else f"{netloc}:{port}"

        # Construct a URL from a tuple of URL-properties
        unparsed = urlunparse([scheme, netloc, path, params, query, fragment])

        # Populate and return a Pydantic URL
        return parse_obj_as(AnyUrl, unparsed)

    @root_validator
    def adjust_url(cls, values):
        """Root Validator
        Verifies configuration consistency, merge configurations
        and update the accessUrl property.
        """

        # Copy model-state into placeholders
        config = values.get("configuration")
        accessUrl = values["accessUrl"]

        # Check and merge user configuration
        user = accessUrl.user if accessUrl.user else config["user"]
        if config["user"] and user != config["user"]:
            raise ValueError("mismatching username in accessUrl and configuration")

        # Check and merge password configuration
        password = accessUrl.password if accessUrl.password else config["password"]
        if config["password"] and password != config["password"]:
            raise ValueError("mismatching password in accessUrl and configuration")

        # Check and merge database name configuration
        dbname = accessUrl.path if accessUrl.path else config["dbname"]
        if config["dbname"] and dbname != config["dbname"]:
            raise ValueError("mismatching dbname in accessUrl and configuration")

        # Reconstruct accessUrl from the updated properties
        values["accessUrl"] = cls._urlconstruct(
            scheme=accessUrl.scheme,
            host=accessUrl.host,
            port=accessUrl.port,
            user=user,
            password=password,
        )
        return values

adjust_url(values)

Root Validator Verifies configuration consistency, merge configurations and update the accessUrl property.

Source code in oteapi/strategies/parse/postgres.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@root_validator
def adjust_url(cls, values):
    """Root Validator
    Verifies configuration consistency, merge configurations
    and update the accessUrl property.
    """

    # Copy model-state into placeholders
    config = values.get("configuration")
    accessUrl = values["accessUrl"]

    # Check and merge user configuration
    user = accessUrl.user if accessUrl.user else config["user"]
    if config["user"] and user != config["user"]:
        raise ValueError("mismatching username in accessUrl and configuration")

    # Check and merge password configuration
    password = accessUrl.password if accessUrl.password else config["password"]
    if config["password"] and password != config["password"]:
        raise ValueError("mismatching password in accessUrl and configuration")

    # Check and merge database name configuration
    dbname = accessUrl.path if accessUrl.path else config["dbname"]
    if config["dbname"] and dbname != config["dbname"]:
        raise ValueError("mismatching dbname in accessUrl and configuration")

    # Reconstruct accessUrl from the updated properties
    values["accessUrl"] = cls._urlconstruct(
        scheme=accessUrl.scheme,
        host=accessUrl.host,
        port=accessUrl.port,
        user=user,
        password=password,
    )
    return values

PostgresResourceStrategy

Resource strategy for Postgres.

Registers strategies:

  • ("accessService", "postgres")

Purpose of this strategy: Connect to a postgres DB and run a SQL query on the dbname to return all relevant rows.

Source code in oteapi/strategies/parse/postgres.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@dataclass
class PostgresResourceStrategy:
    """Resource strategy for Postgres.

    **Registers strategies**:

    - `("accessService", "postgres")`

    Purpose of this strategy: Connect to a postgres DB and run a
    SQL query on the dbname to return all relevant rows.

    """

    resource_config: PostgresResourceConfig

    def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
        """Initialize strategy."""
        return SessionUpdate()

    def get(
        self, session: "Optional[Dict[str, Any]]" = None
    ) -> SessionUpdatePostgresResource:
        """Resource Postgres query responses."""
        if session:
            self._use_filters(session)
        session = session if session else {}

        connection = create_connection(self.resource_config)
        cursor = connection.cursor()
        result = cursor.execute(self.resource_config.configuration.sqlquery).fetchall()
        connection.close()
        return SessionUpdatePostgresResource(result=result)

    def _use_filters(self, session: "Dict[str, Any]") -> None:
        """Update `config` according to filter values found in the session."""
        if "sqlquery" in session and not self.resource_config.configuration.sqlquery:
            # Use SQL query available in session
            self.resource_config.configuration.sqlquery = session["sqlquery"]

get(session=None)

Resource Postgres query responses.

Source code in oteapi/strategies/parse/postgres.py
158
159
160
161
162
163
164
165
166
167
168
169
170
def get(
    self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdatePostgresResource:
    """Resource Postgres query responses."""
    if session:
        self._use_filters(session)
    session = session if session else {}

    connection = create_connection(self.resource_config)
    cursor = connection.cursor()
    result = cursor.execute(self.resource_config.configuration.sqlquery).fetchall()
    connection.close()
    return SessionUpdatePostgresResource(result=result)

initialize(session=None)

Initialize strategy.

Source code in oteapi/strategies/parse/postgres.py
154
155
156
def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
    """Initialize strategy."""
    return SessionUpdate()

SessionUpdatePostgresResource

Bases: SessionUpdate

Configuration model for PostgresResource.

Source code in oteapi/strategies/parse/postgres.py
133
134
135
136
class SessionUpdatePostgresResource(SessionUpdate):
    """Configuration model for PostgresResource."""

    result: list = Field(..., description="List of results from the query.")

create_connection(resource_config)

Create a dbname connection to Postgres dbname.

Parameters:

Name Type Description Default
resource_config PostgresResourceConfig

A dictionary providing everything needed for a psycopg connection configuration

required

Raises:

Type Description
Error

If a DB connection cannot be made.

Returns:

Type Description
Connection

Connection object.

Source code in oteapi/strategies/parse/postgres.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def create_connection(resource_config: PostgresResourceConfig) -> psycopg.Connection:
    """Create a dbname connection to Postgres dbname.

    Parameters:
        resource_config: A dictionary providing everything needed for a psycopg
                     connection configuration

    Raises:
        psycopg.Error: If a DB connection cannot be made.

    Returns:
        Connection object.

    """
    try:
        return psycopg.connect(resource_config.accessUrl)
    except psycopg.Error as exc:
        raise psycopg.Error("Could not connect to given Postgres DB.") from exc