This class is available to import from oteapi.triplestore
, e.g.:
from oteapi.triplestore import TripleStore
Init must initialize the triple store connection
Parameters:
Name |
Type |
Description |
Default |
config |
Union[TripleStoreConfig, Dict[str, Any]]
|
RDF triple-store
configuration. |
required
|
Attributes:
Source code in oteapi/triplestore/triplestore.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 | class TripleStore:
"""
This class is available to import from `oteapi.triplestore`, e.g.:
```python
from oteapi.triplestore import TripleStore
```
Init must initialize the triple store connection
Args:
config (Union[TripleStoreConfig, Dict[str, Any]]): RDF triple-store
configuration.
Attributes:
config (TripleStoreConfig): The RDF triple-store configuration.
"""
def __init__(
self,
config: "Union[TripleStoreConfig, Dict[str, Any]]",
) -> None:
if isinstance(config, (dict, AttrDict)):
self.config = TripleStoreConfig(**config)
elif isinstance(config, TripleStoreConfig):
self.config = config
else:
raise TypeError(
"config should be either a `TripleStoreConfig` data model or a "
"dictionary."
)
self.server = AllegroGraphServer(
self.config.agraphHost,
self.config.agraphPort,
self.config.user.get_secret_value(), # type: ignore [union-attr]
self.config.password.get_secret_value(), # type: ignore [union-attr]
)
def add(self, triples: RDFTriple) -> None:
"""
Add triples to the triplestore.
Args:
triples: triples turtle format(<s> <o> <p>.).
"""
with ag_connect(
self.config.repositoryName,
host=self.config.agraphHost,
port=self.config.agraphPort,
user=self.config.user.get_secret_value(), # type: ignore [union-attr]
password=self.config.password.get_secret_value(), # type: ignore [union-attr] # pylint: disable=line-too-long
) as connection:
connection.addData(triples)
connection.close()
def get(self, sparql_query: str) -> "Any":
"""Return the query result.
Args:
sparql_query: The SPARQL search query.
Returns:
The output of the search query in the form of a list of RDF triples.
"""
connection = self.server.openSession(
reason("<" + str(self.config.repositoryName) + ">")
)
try:
tuple_query = connection.prepareTupleQuery(query=sparql_query)
response = []
with tuple_query.evaluate(output_format=TupleFormat.JSON) as results:
for result in results:
triple = {}
if "'s': " in str(result):
triple["s"] = str(result.getValue("s"))
if "'p': " in str(result):
triple["p"] = str(result.getValue("p"))
if "'o': " in str(result):
triple["o"] = str(result.getValue("o"))
response.append(triple)
connection.close()
return response
except RequestError as error:
return {"Error": error}
def update_delete(self, sparql_query: str) -> None:
"""Remove and update triples.
Useful for modifying and cleaning up mappings.
Args:
sparql_query: The sparql update/delete query.
Returns:
True if update was successful.
"""
with ag_connect(
self.config.repositoryName,
host=self.config.agraphHost,
port=self.config.agraphPort,
user=self.config.user.get_secret_value(), # type: ignore [union-attr]
password=self.config.password.get_secret_value(), # type: ignore [union-attr] # pylint: disable=line-too-long
) as connection:
update_query = connection.prepareUpdate(query=sparql_query)
update_query.evaluate()
connection.close()
|
add(triples)
Add triples to the triplestore.
Parameters:
Name |
Type |
Description |
Default |
triples |
RDFTriple
|
triples turtle format( .). |
required
|
Source code in oteapi/triplestore/triplestore.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 | def add(self, triples: RDFTriple) -> None:
"""
Add triples to the triplestore.
Args:
triples: triples turtle format(<s> <o> <p>.).
"""
with ag_connect(
self.config.repositoryName,
host=self.config.agraphHost,
port=self.config.agraphPort,
user=self.config.user.get_secret_value(), # type: ignore [union-attr]
password=self.config.password.get_secret_value(), # type: ignore [union-attr] # pylint: disable=line-too-long
) as connection:
connection.addData(triples)
connection.close()
|
get(sparql_query)
Return the query result.
Parameters:
Name |
Type |
Description |
Default |
sparql_query |
str
|
The SPARQL search query. |
required
|
Returns:
Type |
Description |
Any
|
The output of the search query in the form of a list of RDF triples. |
Source code in oteapi/triplestore/triplestore.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 | def get(self, sparql_query: str) -> "Any":
"""Return the query result.
Args:
sparql_query: The SPARQL search query.
Returns:
The output of the search query in the form of a list of RDF triples.
"""
connection = self.server.openSession(
reason("<" + str(self.config.repositoryName) + ">")
)
try:
tuple_query = connection.prepareTupleQuery(query=sparql_query)
response = []
with tuple_query.evaluate(output_format=TupleFormat.JSON) as results:
for result in results:
triple = {}
if "'s': " in str(result):
triple["s"] = str(result.getValue("s"))
if "'p': " in str(result):
triple["p"] = str(result.getValue("p"))
if "'o': " in str(result):
triple["o"] = str(result.getValue("o"))
response.append(triple)
connection.close()
return response
except RequestError as error:
return {"Error": error}
|
update_delete(sparql_query)
Remove and update triples.
Useful for modifying and cleaning up mappings.
Parameters:
Name |
Type |
Description |
Default |
sparql_query |
str
|
The sparql update/delete query. |
required
|
Returns:
Type |
Description |
None
|
True if update was successful. |
Source code in oteapi/triplestore/triplestore.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 | def update_delete(self, sparql_query: str) -> None:
"""Remove and update triples.
Useful for modifying and cleaning up mappings.
Args:
sparql_query: The sparql update/delete query.
Returns:
True if update was successful.
"""
with ag_connect(
self.config.repositoryName,
host=self.config.agraphHost,
port=self.config.agraphPort,
user=self.config.user.get_secret_value(), # type: ignore [union-attr]
password=self.config.password.get_secret_value(), # type: ignore [union-attr] # pylint: disable=line-too-long
) as connection:
update_query = connection.prepareUpdate(query=sparql_query)
update_query.evaluate()
connection.close()
|