Skip to content

utils

Utility functions.

as_python(value)

Converts value to a native Python representation.

If value is a Literal, its Python representation will be returned. If value is a string, it will first be converted to a Literal, before its Python representation is returned. Otherwise, value will be returned as-is.

Source code in tripper/utils.py
def as_python(value: "Any") -> "Any":
    """Converts `value` to a native Python representation.

    If `value` is a Literal, its Python representation will be returned.
    If `value` is a string, it will first be converted to a Literal, before
    its Python representation is returned.
    Otherwise, `value` will be returned as-is.
    """
    if isinstance(value, Literal):
        return value.to_python()
    if isinstance(value, str):
        return parse_literal(value).to_python()
    return value

bnode_iri(prefix='', source='', length=5)

Returns a new IRI for a blank node.

Parameters:

Name Type Description Default
prefix str

A prefix to insert between "_:" and the hash.

''
source str

An unique string that the returned bnode will be a hash of. The default is to generate a random hash.

''
length int

Length is the number of bytes in the hash. The default length of 5 is a compromise between readability (10 characters) and safety (corresponding to about 1e12 possibilites). You can change it to 16 to get 128 bits, corresponding to the uniqueness of UUIDs). It makes no sense to go beyond 32, because that is the maximum of the underlying shake_128 algorithm.

5

Returns:

Type Description
A new bnode IRI of the form "_

", where <prefix> is prefix and <hash> is a hex-encoded hash of source.

Source code in tripper/utils.py
def bnode_iri(prefix: str = "", source: str = "", length: int = 5) -> str:
    """Returns a new IRI for a blank node.

    Parameters:
        prefix: A prefix to insert between "_:" and the hash.
        source: An unique string that the returned bnode will be a hash of.
            The default is to generate a random hash.
        length: Length is the number of bytes in the hash.  The default
            length of 5 is a compromise between readability (10 characters)
            and safety (corresponding to about 1e12 possibilites).  You can
            change it to 16 to get 128 bits, corresponding to the uniqueness
            of UUIDs).  It makes no sense to go beyond 32, because that is
            the maximum of the underlying shake_128 algorithm.

    Returns:
        A new bnode IRI of the form "_:<prefix><hash>", where `<prefix>`
        is `prefix` and `<hash>` is a hex-encoded hash of `source`.
    """
    if source:
        hash = hashlib.shake_128(source.encode()).hexdigest(length)
    else:
        # From Python 3.9 we can use random.randbytes(length).hex()
        hash = "".join(
            hex(random.randint(0, 255))[2:] for i in range(length)  # nosec
        )
    return f"_:{prefix}{hash}"

check_function(func, s, exceptions)

Help function returning true if func(s) does not raise an exception.

False is returned if func(s) raises an exception listed in exceptions. Otherwise the exception is propagated.

Source code in tripper/utils.py
def check_function(func: "Callable", s: str, exceptions) -> bool:
    """Help function returning true if `func(s)` does not raise an exception.

    False is returned if `func(s)` raises an exception listed in `exceptions`.
    Otherwise the exception is propagated.
    """
    # Note that the missing type hint on `exceptions` is deliberate, see
    # https://peps.python.org/pep-0484/#exceptions
    try:
        func(s)
    except exceptions:
        return False
    return True

en(value)

Convenience function that returns value as a plain english literal.

Equivalent to Literal(value, lang="en").

Source code in tripper/utils.py
def en(value) -> "Literal":  # pylint: disable=invalid-name
    """Convenience function that returns value as a plain english literal.

    Equivalent to ``Literal(value, lang="en")``.
    """
    return Literal(value, lang="en")

extend_namespace(namespace, triplestore)

Extend a namespace with additional known names.

This makes only sense if the namespace was created with label_annotations or check set to true.

Parameters:

Name Type Description Default
namespace Namespace

The namespace to extend.

required
triplestore Union[Triplestore, str, Path, dict]

Source from which to extend the namespace. It can be of one of the following types: - Triplestore: triplestore object to read from - str: URL to a triplestore to read from. May also be a path to a local file to read from - Path: path to a local file to read from - dict: dict mapping new IRI names to their corresponding IRIs

required
Source code in tripper/utils.py
def extend_namespace(
    namespace: Namespace,
    triplestore: "Union[Triplestore, str, Path, dict]",
):
    """Extend a namespace with additional known names.

    This makes only sense if the namespace was created with
    `label_annotations` or `check` set to true.

    Arguments:
        namespace: The namespace to extend.
        triplestore: Source from which to extend the namespace. It can be
            of one of the following types:
              - Triplestore: triplestore object to read from
              - str: URL to a triplestore to read from.  May also be a
                path to a local file to read from
              - Path: path to a local file to read from
              - dict: dict mapping new IRI names to their corresponding IRIs
    """
    if namespace._iris is None:  # pylint: disable=protected-access
        raise TypeError(
            "only namespaces created with `label_annotations` or `check` set "
            "to true can be extend"
        )
    if isinstance(triplestore, dict):
        namespace._iris.update(triplestore)  # pylint: disable=protected-access
    else:
        namespace._update_iris(  # pylint: disable=protected-access
            triplestore, reload=True
        )

function_id(func, length=4)

Return a checksum for function func.

The returned object is a string of hexadecimal digits.

length is the number of bytes in the returned checksum. Since the current implementation is based on the shake_128 algorithm, it make no sense to set length larger than 32 bytes.

Source code in tripper/utils.py
def function_id(func: "Callable", length: int = 4) -> str:
    """Return a checksum for function `func`.

    The returned object is a string of hexadecimal digits.

    `length` is the number of bytes in the returned checksum.  Since
    the current implementation is based on the shake_128 algorithm,
    it make no sense to set `length` larger than 32 bytes.
    """
    source = inspect.getsource(func)
    doc = func.__doc__ if func.__doc__ else ""
    return hashlib.shake_128(  # pylint: disable=too-many-function-args
        (source + doc).encode()
    ).hexdigest(length)

infer_iri(obj)

Return IRI of the individual that stands for object obj.

Source code in tripper/utils.py
def infer_iri(obj):
    """Return IRI of the individual that stands for object `obj`."""

    # Please note that tripper does not depend on neither DLite nor Pydantic.
    # Hence neither of these packages are imported.  However, due to duck-
    # typing, infer_iri() is still able to recognise DLite and Pydantic
    # objects and infer their IRIs.

    if isinstance(obj, str):
        iri = obj
    elif hasattr(obj, "uri") and isinstance(obj.uri, str):
        # dlite.Metadata or dataclass (or instance with uri)
        iri = obj.uri
    elif hasattr(obj, "uuid") and obj.uuid:
        # dlite.Instance or dataclass
        iri = str(obj.uuid)
    elif hasattr(obj, "schema") and callable(obj.schema):
        # pydantic.BaseModel
        if hasattr(obj, "identity") and isinstance(obj.identity, str):
            # soft7 pydantic model
            iri = obj.identity
        else:
            # pydantic instance
            schema = obj.schema()
            properties = schema["properties"]
            if "uri" in properties and isinstance(properties["uri"], str):
                iri = properties["uri"]
            if "identity" in properties and isinstance(
                properties["identity"], str
            ):
                iri = properties["identity"]
            if "uuid" in properties and properties["uuid"]:
                iri = str(properties["uuid"])
    else:
        raise TypeError(f"cannot infer IRI from object: {obj!r}")
    return str(iri)

parse_literal(literal)

Parse Python object literal and return it as an instance of Literal.

The main difference between this function and the Literal constructor, is that this function correctly interprets n3-encoded literal strings.

Source code in tripper/utils.py
def parse_literal(literal: "Any") -> "Any":
    """Parse Python object `literal` and return it as an instance of Literal.

    The main difference between this function and the Literal constructor,
    is that this function correctly interprets n3-encoded literal strings.
    """
    # pylint: disable=invalid-name,too-many-branches,too-many-return-statements
    lang, datatype = None, None

    if isinstance(literal, Literal):
        return literal

    if hasattr(literal, "lang"):
        lang = literal.lang
    elif hasattr(literal, "language"):
        lang = literal.language

    if (
        not lang
        and hasattr(literal, "datatype")
        and literal.datatype is not None
    ):
        datatype = str(literal.datatype)

    # This will handle rdflib literals correctly and probably most other
    # literal representations as well.
    if hasattr(literal, "value"):
        return Literal(literal.value, lang=lang, datatype=datatype)

    if not isinstance(literal, str):
        if isinstance(literal, tuple(Literal.datatypes)):
            return Literal(
                literal,
                lang=lang,
                datatype=Literal.datatypes.get(type(literal))[
                    0
                ],  # type: ignore
            )
        raise TypeError(f"unsupported literal type: {type(literal)}")

    if hasattr(literal, "n3") and callable(literal.n3):
        return parse_literal(literal.n3())

    match = re.match(r'^\s*("""(.*)"""|"(.*)")\s*$', literal, flags=re.DOTALL)
    if match:
        _, v1, v2 = match.groups()
        value, datatype = v1 if v1 else v2, XSD.string
    else:
        match = re.match(
            r'^\s*("""(.*)"""|"(.*)")\^\^(<([^>]+)>|([^<].*))\s*$',
            literal,
            flags=re.DOTALL,
        )
        if match:
            _, v1, v2, _, d1, d2 = match.groups()
            value = v1 if v1 else v2
            datatype = d1 if d1 else d2
        else:
            match = re.match(
                r'^\s*("""(.*)"""|"(.*)")@(.*)\s*$', literal, flags=re.DOTALL
            )
            if match:
                _, v1, v2, lang = match.groups()
                value = v1 if v1 else v2
            else:
                value = literal

    if lang or datatype:
        if datatype:
            types = {}
            for pytype, datatypes in Literal.datatypes.items():
                types.update({t: pytype for t in datatypes})
            type_ = types.get(datatype, str)
            if type_ is bool and value in ("False", "false", "0", 0, False):
                return Literal(False)
            try:
                value = type_(value)
            except TypeError:
                pass
        return Literal(value, lang=lang, datatype=datatype)

    for type_, datatypes in Literal.datatypes.items():
        if type_ is not bool:
            try:
                return Literal(
                    type_(literal), lang=lang, datatype=datatypes[0]
                )
            except (ValueError, TypeError):
                pass

    raise ValueError(f'cannot parse literal "{literal}"')

parse_object(obj)

Applies heuristics to parse RDF object obj to an IRI or literal.

The following heuristics is performed (in the given order): - If obj is a Literal, it is returned. - If obj is a string and - starts with "_:", it is assumed to be a blank node and returned. - starts with a scheme, it is asumed to be an IRI and returned. - can be converted to a float, int or datetime, it is returned converted to a literal of the corresponding type. - it is a valid n3 representation, return it as the given type. - otherwise, return it as a xsd:string literal. - Otherwise, raise an ValueError.

Returns A string if obj is considered to be an IRI, otherwise a literal.

Source code in tripper/utils.py
def parse_object(obj: "Union[str, Literal]") -> "Union[str, Any]":
    """Applies heuristics to parse RDF object `obj` to an IRI or literal.

    The following heuristics is performed (in the given order):
    - If `obj` is a Literal, it is returned.
    - If `obj` is a string and
      - starts with "_:", it is assumed to be a blank node and returned.
      - starts with a scheme, it is asumed to be an IRI and returned.
      - can be converted to a float, int or datetime, it is returned
        converted to a literal of the corresponding type.
      - it is a valid n3 representation, return it as the given type.
      - otherwise, return it as a xsd:string literal.
    - Otherwise, raise an ValueError.

    Returns
        A string if `obj` is considered to be an IRI, otherwise a
        literal.
    """
    # pylint: disable=too-many-return-statements
    if isinstance(obj, Literal):
        return obj
    if isinstance(obj, str):
        if obj.startswith("_:") or re.match(r"^[a-z]+://", obj):  # IRI
            return obj
        if obj in ("true", "false"):  # boolean
            return Literal(obj, datatype=XSD.boolean)
        if re.match(r"^\s*[+-]?\d+\s*$", obj):  # integer
            return Literal(obj, datatype=XSD.integer)
        if check_function(float, obj, ValueError):  #  float
            return Literal(obj, datatype=XSD.double)
        if check_function(
            datetime.datetime.fromisoformat, obj, ValueError
        ):  #  datetime
            return Literal(obj, datatype=XSD.dateTime)
        return parse_literal(obj)
    raise ValueError("`obj` should be a literal or a string.")

random_string(length=8)

Return a random string of the given length.

Source code in tripper/utils.py
def random_string(length=8):
    """Return a random string of the given length."""
    letters = string.ascii_letters + string.digits
    return "".join(random.choice(letters) for i in range(length))  # nosec

split_iri(iri)

Split iri into namespace and name parts and return them as a tuple.

Parameters:

Name Type Description Default
iri str

The IRI to be split.

required

Returns:

Type Description
Tuple[str, str]

A split IRI. Split into namespace and name.

Source code in tripper/utils.py
def split_iri(iri: str) -> "Tuple[str, str]":
    """Split iri into namespace and name parts and return them as a tuple.

    Parameters:
        iri: The IRI to be split.

    Returns:
        A split IRI. Split into namespace and name.

    """
    if "#" in iri:
        namespace, name = iri.rsplit("#", 1)
        return f"{namespace}#", name

    if "/" in iri:
        namespace, name = iri.rsplit("/", 1)
        return f"{namespace}/", name

    raise ValueError("all IRIs should contain a slash")

tfilter(triples, subject=None, predicate=None, object=None)

Filters out non-matching triples.

Parameters:

Name Type Description Default
triples Iterable[Triple]

Triples to filter from.

required
subject Optional[Union[Iterable[str], str]]

If given, only keep triples whos subject matches subject. Can be an iterable of subjects.

None
predicate Optional[Union[Iterable[str], str]]

If given, only keep triples whos predicate matches predicate. Can be an iterable of subjects.

None
object Optional[Union[Iterable, str, Literal]]

If given, only keep triples whos subject matches object. Can be an iterable of objects.

None

Returns:

Type Description
Generator[Triple, None, None]

A generator over matching triples.

Source code in tripper/utils.py
def tfilter(
    triples: "Iterable[Triple]",
    subject: "Optional[Union[Iterable[str], str]]" = None,
    predicate: "Optional[Union[Iterable[str], str]]" = None,
    object: "Optional[Union[Iterable, str, Literal]]" = None,
) -> "Generator[Triple, None, None]":
    """Filters out non-matching triples.

    Parameters:
        triples: Triples to filter from.
        subject: If given, only keep triples whos subject matches `subject`.
            Can be an iterable of subjects.
        predicate: If given, only keep triples whos predicate matches
            `predicate`.  Can be an iterable of subjects.
        object: If given, only keep triples whos subject matches `object`.
            Can be an iterable of objects.

    Returns:
        A generator over matching triples.
    """
    for s, p, o in triples:
        if subject and (
            s != subject if isinstance(subject, str) else s not in subject
        ):
            continue
        if predicate and (
            p != predicate
            if isinstance(predicate, str)
            else p not in predicate
        ):
            continue
        if object and (
            o != object
            if isinstance(object, (str, Literal))
            else o not in object
        ):
            continue
        yield s, p, o