Skip to content

triplestore

An RDF triplestore using Allegrograph https://franz.com/agraph/support/documentation/current/python/api.html https://franz.com/agraph/support/documentation/current/agraph-introduction.html

Features:

  • Store mapping data (triple format)
  • AllegroGraph has a well documented python API package

TripleStore

This class is available to import from oteapi.triplestore, e.g.:

from oteapi.triplestore import TripleStore
Init must initialize the triple store connection

Parameters:

Name Type Description Default
config Union[TripleStoreConfig, Dict[str, Any]]

RDF triple-store configuration.

required

Attributes:

Name Type Description
config TripleStoreConfig

The RDF triple-store configuration.

Source code in oteapi/triplestore/triplestore.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class TripleStore:
    """
    This class is available to import from `oteapi.triplestore`, e.g.:

    ```python
    from oteapi.triplestore import TripleStore
    ```
    Init must initialize the triple store connection

    Args:
        config (Union[TripleStoreConfig, Dict[str, Any]]): RDF triple-store
            configuration.

    Attributes:
        config (TripleStoreConfig): The RDF triple-store configuration.

    """

    def __init__(
        self,
        config: "Union[TripleStoreConfig, Dict[str, Any]]",
    ) -> None:
        if isinstance(config, (dict, AttrDict)):
            self.config = TripleStoreConfig(**config)
        elif isinstance(config, TripleStoreConfig):
            self.config = config
        else:
            raise TypeError(
                "config should be either a `TripleStoreConfig` data model or a "
                "dictionary."
            )
        self.server = AllegroGraphServer(
            self.config.agraphHost,
            self.config.agraphPort,
            self.config.user.get_secret_value(),  # type: ignore [union-attr]
            self.config.password.get_secret_value(),  # type: ignore [union-attr]
        )

    def add(self, triples: RDFTriple) -> None:
        """
        Add triples to the triplestore.

        Args:
            triples: triples turtle format(<s> <o> <p>.).

        """
        with ag_connect(
            self.config.repositoryName,
            host=self.config.agraphHost,
            port=self.config.agraphPort,
            user=self.config.user.get_secret_value(),  # type: ignore [union-attr]
            password=self.config.password.get_secret_value(),  # type: ignore [union-attr]  # noqa: E501
        ) as connection:
            connection.addData(triples)
            connection.close()

    def get(self, sparql_query: str) -> "Any":
        """Return the query result.

        Args:
            sparql_query: The SPARQL search query.

        Returns:
            The output of the search query in the form of a list of RDF triples.

        """
        connection = self.server.openSession(
            reason("<" + str(self.config.repositoryName) + ">")
        )
        try:
            tuple_query = connection.prepareTupleQuery(query=sparql_query)
            response = []

            with tuple_query.evaluate(output_format=TupleFormat.JSON) as results:
                for result in results:
                    triple = {}
                    if "'s': " in str(result):
                        triple["s"] = str(result.getValue("s"))
                    if "'p': " in str(result):
                        triple["p"] = str(result.getValue("p"))
                    if "'o': " in str(result):
                        triple["o"] = str(result.getValue("o"))
                    response.append(triple)
            connection.close()
            return response
        except RequestError as error:
            return {"Error": error}

    def update_delete(self, sparql_query: str) -> None:
        """Remove and update triples.

        Useful for modifying and cleaning up mappings.

        Args:
            sparql_query: The sparql update/delete query.
        Returns:
            True if update was successful.

        """
        with ag_connect(
            self.config.repositoryName,
            host=self.config.agraphHost,
            port=self.config.agraphPort,
            user=self.config.user.get_secret_value(),  # type: ignore [union-attr]
            password=self.config.password.get_secret_value(),  # type: ignore [union-attr]  # noqa: E501
        ) as connection:
            update_query = connection.prepareUpdate(query=sparql_query)
            update_query.evaluate()
            connection.close()

add(triples)

Add triples to the triplestore.

Parameters:

Name Type Description Default
triples RDFTriple

triples turtle format(

.).

required
Source code in oteapi/triplestore/triplestore.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def add(self, triples: RDFTriple) -> None:
    """
    Add triples to the triplestore.

    Args:
        triples: triples turtle format(<s> <o> <p>.).

    """
    with ag_connect(
        self.config.repositoryName,
        host=self.config.agraphHost,
        port=self.config.agraphPort,
        user=self.config.user.get_secret_value(),  # type: ignore [union-attr]
        password=self.config.password.get_secret_value(),  # type: ignore [union-attr]  # noqa: E501
    ) as connection:
        connection.addData(triples)
        connection.close()

get(sparql_query)

Return the query result.

Parameters:

Name Type Description Default
sparql_query str

The SPARQL search query.

required

Returns:

Type Description
Any

The output of the search query in the form of a list of RDF triples.

Source code in oteapi/triplestore/triplestore.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def get(self, sparql_query: str) -> "Any":
    """Return the query result.

    Args:
        sparql_query: The SPARQL search query.

    Returns:
        The output of the search query in the form of a list of RDF triples.

    """
    connection = self.server.openSession(
        reason("<" + str(self.config.repositoryName) + ">")
    )
    try:
        tuple_query = connection.prepareTupleQuery(query=sparql_query)
        response = []

        with tuple_query.evaluate(output_format=TupleFormat.JSON) as results:
            for result in results:
                triple = {}
                if "'s': " in str(result):
                    triple["s"] = str(result.getValue("s"))
                if "'p': " in str(result):
                    triple["p"] = str(result.getValue("p"))
                if "'o': " in str(result):
                    triple["o"] = str(result.getValue("o"))
                response.append(triple)
        connection.close()
        return response
    except RequestError as error:
        return {"Error": error}

update_delete(sparql_query)

Remove and update triples.

Useful for modifying and cleaning up mappings.

Parameters:

Name Type Description Default
sparql_query str

The sparql update/delete query.

required

Returns: True if update was successful.

Source code in oteapi/triplestore/triplestore.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def update_delete(self, sparql_query: str) -> None:
    """Remove and update triples.

    Useful for modifying and cleaning up mappings.

    Args:
        sparql_query: The sparql update/delete query.
    Returns:
        True if update was successful.

    """
    with ag_connect(
        self.config.repositoryName,
        host=self.config.agraphHost,
        port=self.config.agraphPort,
        user=self.config.user.get_secret_value(),  # type: ignore [union-attr]
        password=self.config.password.get_secret_value(),  # type: ignore [union-attr]  # noqa: E501
    ) as connection:
        update_query = connection.prepareUpdate(query=sparql_query)
        update_query.evaluate()
        connection.close()