"""Python implementation of the W3C Provenance Data Model (PROV-DM), including
support for PROV-JSON import/export
References:
PROV-DM: http://www.w3.org/TR/prov-dm/
PROV-JSON: https://openprovenance.org/prov-json/
"""
from __future__ import annotations # needed for | type annotations in Python < 3.10
from collections import defaultdict
import datetime
import io
import itertools
import logging
import os
import shutil
import tempfile
from io import IOBase
from typing import (
Any,
Callable,
Iterable,
Optional,
Union,
)
import typing # to use typing.TypeAlias in comments for compatibility with Python 3.9
from urllib.parse import urlparse
import dateutil.parser
from prov import Error, serializers
from prov.constants import *
from prov.identifier import Identifier, QualifiedName, Namespace
__author__ = "Trung Dong Huynh"
__email__ = "trungdong@donggiang.com"
logger = logging.getLogger(__name__)
# Type aliases for convenience
QualifiedNameCandidate = Union[QualifiedName, str, Identifier] # type: typing.TypeAlias
OptionalID = Optional[QualifiedNameCandidate] # type: typing.TypeAlias
EntityRef = Union["ProvEntity", QualifiedNameCandidate] # type: typing.TypeAlias
ActivityRef = Union["ProvActivity", QualifiedNameCandidate] # type: typing.TypeAlias
AgentRef = Union[
"ProvAgent", "ProvEntity", "ProvActivity", QualifiedNameCandidate
] # type: typing.TypeAlias
GenrationRef = Union["ProvGeneration", QualifiedNameCandidate] # type: typing.TypeAlias
UsageRef = Union["ProvUsage", QualifiedNameCandidate] # type: typing.TypeAlias
RecordAttributesArg = Union[
dict[QualifiedNameCandidate, Any],
Iterable[tuple[QualifiedNameCandidate, Any]],
] # type: typing.TypeAlias
NameValuePair = tuple[QualifiedName, Any] # type: typing.TypeAlias
DatetimeOrStr = Union[datetime.datetime, str] # type: typing.TypeAlias
NSCollection = Union[dict[str, str], Iterable[Namespace]] # type: typing.TypeAlias
PathLike = Union[str, bytes, os.PathLike] # type: typing.TypeAlias
# Data Types
def _ensure_datetime(value: Optional[DatetimeOrStr]) -> Optional[datetime.datetime]:
if isinstance(value, str):
return dateutil.parser.parse(value)
else:
return value
[docs]
def parse_xsd_datetime(value: str) -> Optional[datetime.datetime]:
try:
return dateutil.parser.parse(value)
except ValueError:
pass
return None
[docs]
def parse_boolean(value: str) -> Optional[bool]:
if value.lower() in ("false", "0"):
return False
elif value.lower() in ("true", "1"):
return True
else:
return None
DATATYPE_PARSERS = {
datetime.datetime: parse_xsd_datetime,
}
# Mappings for XSD datatypes to Python standard types
SupportedXSDParsedTypes = Union[
str, datetime.datetime, float, int, bool, Identifier, None
] # type: typing.TypeAlias
XSD_DATATYPE_PARSERS: dict[QualifiedName, Callable[[str], SupportedXSDParsedTypes]] = {
XSD_STRING: str,
XSD_DOUBLE: float,
XSD_LONG: int,
XSD_INT: int,
XSD_BOOLEAN: parse_boolean,
XSD_DATETIME: parse_xsd_datetime,
XSD_ANYURI: Identifier,
}
[docs]
def parse_xsd_types(value: str, datatype: QualifiedName) -> SupportedXSDParsedTypes:
return (
XSD_DATATYPE_PARSERS[datatype](value)
if datatype in XSD_DATATYPE_PARSERS
else None
)
[docs]
def first(a_set: set[Any]) -> Any | None:
return next(iter(a_set), None)
def _ensure_multiline_string_triple_quoted(value: str) -> str:
# converting the value to a string
s = str(value)
# Escaping any double quote
s = s.replace('"', '\\"')
if "\n" in s:
return '"""%s"""' % s
else:
return '"%s"' % s
[docs]
def encoding_provn_value(
value: str | datetime.datetime | float | bool | QualifiedName,
) -> str:
if isinstance(value, str):
return _ensure_multiline_string_triple_quoted(value)
elif isinstance(value, datetime.datetime):
return '"{0}" %% xsd:dateTime'.format(value.isoformat())
elif isinstance(value, float):
return '"%g" %%%% xsd:float' % value
elif isinstance(value, bool):
return '"%i" %%%% xsd:boolean' % value
else:
# TODO: QName export
return str(value)
[docs]
class Literal(object):
def __init__(
self,
value: Any,
datatype: Optional[QualifiedName] = None,
langtag: Optional[str] = None,
):
self._value: str = str(value) # value is always a string
if langtag:
if datatype is None:
logger.debug(
"Assuming prov:InternationalizedString as the type of "
'"%s"@%s' % (value, langtag)
)
datatype = PROV_INTERNATIONALIZEDSTRING
# PROV JSON states that the type field must not be set when
# using the lang attribute and PROV XML requires it to be an
# internationalized string.
elif datatype != PROV_INTERNATIONALIZEDSTRING:
logger.warning(
'Invalid data type (%s) for "%s"@%s, overridden as '
"prov:InternationalizedString." % (datatype, value, langtag)
)
datatype = PROV_INTERNATIONALIZEDSTRING
self._datatype: Optional[QualifiedName] = datatype
# langtag is always a string
self._langtag: Optional[str] = str(langtag) if langtag is not None else None
def __str__(self) -> str:
return self.provn_representation()
def __repr__(self) -> str:
return "<Literal: %s>" % self.provn_representation()
def __eq__(self, other: Any) -> bool:
return (
(
self._value == other.value
and self._datatype == other.datatype
and self._langtag == other.langtag
)
if isinstance(other, Literal)
else False
)
def __ne__(self, other: Any) -> bool:
return not (self == other)
def __hash__(self) -> int:
return hash((self._value, self._datatype, self._langtag))
@property
def value(self) -> str:
return self._value
@property
def datatype(self) -> QualifiedName | None:
return self._datatype
@property
def langtag(self) -> str | None:
return self._langtag
[docs]
def has_no_langtag(self) -> bool:
return self._langtag is None
[docs]
def provn_representation(self) -> str:
if self._langtag:
# a language tag can only go with prov:InternationalizedString
return "%s@%s" % (
_ensure_multiline_string_triple_quoted(self._value),
str(self._langtag),
)
else:
return "%s %%%% %s" % (
_ensure_multiline_string_triple_quoted(self._value),
str(self._datatype),
)
# Exceptions and warnings
[docs]
class ProvException(Error):
"""Base class for PROV model exceptions."""
pass
[docs]
class ProvWarning(Warning):
"""Base class for PROV model warnings."""
pass
[docs]
class ProvExceptionInvalidQualifiedName(ProvException):
"""Exception for an invalid qualified identifier name."""
qname = None
"""Intended qualified name."""
def __init__(self, qname: Any):
"""
Constructor.
:param qname: Invalid qualified name.
"""
self.qname = qname
def __str__(self) -> str:
return "Invalid Qualified Name: %s" % self.qname
[docs]
class ProvElementIdentifierRequired(ProvException):
"""Exception for a missing element identifier."""
def __str__(self) -> str:
return (
"An identifier is missing. All PROV elements require a valid " "identifier."
)
# PROV records
[docs]
class ProvRecord(object):
"""Base class for PROV records."""
FORMAL_ATTRIBUTES = () # type: tuple[QualifiedName, ...]
"""Formal attributes names of this record type, in the expected order."""
_prov_type: Optional[QualifiedName] = None
"""PROV type of record."""
def __init__(
self,
bundle: ProvBundle,
identifier: Optional[QualifiedName],
attributes: Optional[RecordAttributesArg] = None,
):
"""
Constructor.
:param bundle: Bundle for the PROV record.
:param identifier: (Unique) identifier of the record.
:param attributes: Attributes to associate with the record (default: None).
"""
self._bundle = bundle
self._identifier = identifier
self._attributes: dict[QualifiedName, set] = defaultdict(set)
if attributes:
self.add_attributes(attributes)
def __hash__(self) -> int:
return hash((self.get_type(), self._identifier, frozenset(self.attributes)))
[docs]
def copy(self) -> "ProvRecord":
"""
Return an exact copy of this record.
"""
return PROV_REC_CLS[self.get_type()](
self._bundle, self.identifier, self.attributes
)
[docs]
def get_type(self) -> QualifiedName:
"""Returns the PROV type of the record."""
if self._prov_type is not None:
return self._prov_type
else:
raise NotImplementedError("Type not defined for this record.")
[docs]
def get_asserted_types(self) -> set[QualifiedName]:
"""Returns the set of all asserted PROV types of this record."""
return self._attributes[PROV_TYPE]
[docs]
def add_asserted_type(self, type_identifier: QualifiedName) -> None:
"""
Adds a PROV type assertion to the record.
:param type_identifier: PROV namespace identifier to add.
"""
self._attributes[PROV_TYPE].add(type_identifier)
[docs]
def get_attribute(self, attr_name: QualifiedNameCandidate) -> set:
"""
Returns the attribute values (if any) for the specified attribute name).
:param attr_name: Name of the attribute.
:return: Set of value(s) of the specified attribute.
:rtype: set
"""
attr_name_qn = self._bundle.mandatory_valid_qname(attr_name)
return self._attributes[attr_name_qn]
@property
def identifier(self) -> QualifiedName | None:
"""Record's identifier."""
return self._identifier
@property
def attributes(self) -> list[tuple[QualifiedName, Any]]:
"""
All record attributes.
:return: List of tuples (name, value)
"""
return [
(attr_name, value)
for attr_name, values in self._attributes.items()
for value in values
]
@property
def args(self) -> tuple:
"""
All values of the record's formal attributes.
:return: Tuple
"""
return tuple(
first(self._attributes[attr_name]) for attr_name in self.FORMAL_ATTRIBUTES
)
@property
def formal_attributes(self) -> tuple[tuple[QualifiedName, Any], ...]:
"""
All names and values of the record's formal attributes.
:return: Tuple of tuples (name, value)
"""
return tuple(
(attr_name, first(self._attributes[attr_name]))
for attr_name in self.FORMAL_ATTRIBUTES
)
@property
def extra_attributes(self) -> tuple[tuple[QualifiedName, Any], ...]:
"""
All names and values of the record's attributes that are not formal
attributes.
:return: Tuple of tuples (name, value)
"""
return tuple(
(attr_name, attr_value)
for attr_name, attr_value in self.attributes
if attr_name not in self.FORMAL_ATTRIBUTES
)
@property
def bundle(self) -> ProvBundle:
"""
Bundle of the record.
:return: :py:class:`ProvBundle`
"""
return self._bundle
@property
def label(self) -> str:
"""Identifying label of the record."""
return str(
first(self._attributes[PROV_LABEL])
if self._attributes[PROV_LABEL]
else self._identifier
)
@property
def value(self) -> Any:
"""Value of the record."""
return self._attributes[PROV_VALUE]
# Handling attributes
def _auto_literal_conversion(self, literal: Any) -> Any:
# This method normalise datatype for literals
if isinstance(literal, ProvRecord):
# Use the QName of the record as the literal
literal = literal.identifier
if isinstance(literal, str):
return str(literal)
elif isinstance(literal, QualifiedName):
return self._bundle.valid_qualified_name(literal)
elif isinstance(literal, Literal) and literal.has_no_langtag():
if literal.datatype:
# try to convert a generic Literal object to Python standard type
# to match the JSON decoding's literal conversion
value = parse_xsd_types(literal.value, literal.datatype)
else:
# A literal with no datatype nor langtag defined
# try auto-converting the value
value = self._auto_literal_conversion(literal.value)
if value is not None:
return value
# No conversion possible, return the original value
return literal
[docs]
def add_attributes(self, attributes: RecordAttributesArg) -> None:
"""
Add attributes to the record.
:param attributes: Dictionary of attributes, with keys being qualified
identifiers. Alternatively, an iterable of tuples (key, value) with the
keys satisfying the same condition.
"""
if attributes:
if isinstance(attributes, dict):
# Converting the dictionary into a list of tuples
# (i.e. attribute-value pairs)
attributes = attributes.items()
# Check if one of the attributes specifies that the current type
# is a collection. In that case multiple attributes of the same
# type are allowed.
if PROV_ATTR_COLLECTION in [_i[0] for _i in attributes]:
is_collection = True
else:
is_collection = False
for attr_name, original_value in attributes:
if original_value is None:
continue
# make sure the attribute name is valid
attr = self._bundle.mandatory_valid_qname(attr_name)
if attr in PROV_ATTRIBUTE_QNAMES:
# Expecting a qualified name
if isinstance(original_value, ProvRecord):
# Use the identifier of the record, which must exist, as the value for this attribute
qname = original_value.identifier
if qname is None:
raise ProvException(
f"Invalid value for attribute {attr}: {original_value}."
f" The record has no identifier."
)
else:
qname = original_value
value = self._bundle.mandatory_valid_qname(qname) # type: Any
elif attr in PROV_ATTRIBUTE_LITERALS:
# Expecting a datetime object or a string that can be parsed as a datetime
if isinstance(original_value, str):
value = parse_xsd_datetime(original_value)
else:
value = original_value
if not isinstance(value, datetime.datetime):
raise ProvException(
f"Invalid value for attribute {attr}: {original_value}. "
f"Expected a datetime object or a string that can be parsed"
f" as a datetime."
)
else:
value = self._auto_literal_conversion(original_value)
if value is None:
raise ProvException(
"Invalid value for attribute %s: %s" % (attr, original_value)
)
if (
not is_collection
and attr in PROV_ATTRIBUTES
and self._attributes[attr]
):
existing_value = first(self._attributes[attr])
is_not_same_value = True
try:
is_not_same_value = value != existing_value
except TypeError:
# Cannot compare them
pass # consider them different values
if is_not_same_value:
raise ProvException(
"Cannot have more than one value for attribute %s" % attr
)
else:
# Same value, ignore it
continue
self._attributes[attr].add(value)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ProvRecord):
return False
if self.get_type() != other.get_type():
return False
if self._identifier and not (self._identifier == other._identifier):
return False
return set(self.attributes) == set(other.attributes)
def __str__(self) -> str:
return self.get_provn()
[docs]
def get_provn(self) -> str:
"""
Returns the PROV-N representation of the record.
:return: String
"""
items = []
# Generating identifier
relation_id = "" # default blank
if self._identifier:
identifier = str(self._identifier) # TODO: QName export
if self.is_element():
items.append(identifier)
else:
# this is a relation, which relation uses a semicolon to separate identifiers
relation_id = identifier + "; "
# Writing out the formal attributes
for attr in self.FORMAL_ATTRIBUTES:
if attr in self._attributes and self._attributes[attr]:
# Formal attributes always have single values
value = first(self._attributes[attr])
# TODO: QName export
items.append(
value.isoformat()
if isinstance(value, datetime.datetime)
else str(value)
)
else:
items.append("-")
# Writing out the remaining attributes
extra = []
for attr in self._attributes:
if attr not in self.FORMAL_ATTRIBUTES:
for value in self._attributes[attr]:
try:
# try if there is a prov-n representation defined
provn_represenation = value.provn_representation()
except AttributeError:
provn_represenation = encoding_provn_value(value)
# TODO: QName export
extra.append("%s=%s" % (str(attr), provn_represenation))
if extra:
items.append("[%s]" % ", ".join(extra))
prov_n = "%s(%s%s)" % (
PROV_N_MAP[self.get_type()],
relation_id,
", ".join(items),
)
return prov_n
[docs]
def is_element(self) -> bool:
"""
True, if the record is an element, False otherwise.
:return: bool
"""
return False
[docs]
def is_relation(self) -> bool:
"""
True, if the record is a relation, False otherwise.
:return: bool
"""
return False
# Abstract classes for elements and relations
[docs]
class ProvElement(ProvRecord):
"""Provenance Element (nodes in the provenance graph)."""
def __init__(
self,
bundle: ProvBundle,
identifier: Optional[QualifiedName],
attributes: Optional[RecordAttributesArg] = None,
):
if identifier is None:
# All types of PROV elements require a valid identifier
raise ProvElementIdentifierRequired()
super(ProvElement, self).__init__(bundle, identifier, attributes)
[docs]
def is_element(self) -> bool:
"""
True, if the record is an element, False otherwise.
:return: bool
"""
return True
def __repr__(self) -> str:
return "<%s: %s>" % (self.__class__.__name__, self._identifier)
[docs]
class ProvRelation(ProvRecord):
"""Provenance Relationship (edge between nodes)."""
[docs]
def is_relation(self) -> bool:
"""
True, if the record is a relation, False otherwise.
:return: bool
"""
return True
def __repr__(self) -> str:
identifier = " %s" % self._identifier if self._identifier else ""
element_1, element_2 = [qname for _, qname in self.formal_attributes[:2]]
return "<%s:%s (%s, %s)>" % (
self.__class__.__name__,
identifier,
element_1,
element_2,
)
# Component 1: Entities and Activities
[docs]
class ProvEntity(ProvElement):
"""Provenance Entity element"""
_prov_type = PROV_ENTITY
# Convenient assertions that take the current ProvEntity as the first
# (formal) argument
[docs]
def wasGeneratedBy(
self,
activity: Optional[ActivityRef] = None,
time: Optional[DatetimeOrStr] = None,
attributes: Optional[RecordAttributesArg] = None,
) -> ProvEntity:
"""
Creates a new generation record to this entity.
:param activity: Activity or string identifier of the activity involved in
the generation (default: None).
:param time: Optional time for the generation (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.generation(self, activity, time, other_attributes=attributes)
return self
[docs]
def wasInvalidatedBy(
self,
activity: Optional[ActivityRef],
time: Optional[DatetimeOrStr] = None,
attributes: Optional[RecordAttributesArg] = None,
) -> ProvEntity:
"""
Creates a new invalidation record for this entity.
:param activity: Activity or string identifier of the activity involved in
the invalidation (default: None).
:param time: Optional time for the invalidation (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.invalidation(self, activity, time, other_attributes=attributes)
return self
[docs]
def wasDerivedFrom(
self,
usedEntity: EntityRef,
activity: Optional[ActivityRef] = None,
generation: Optional[GenrationRef] = None,
usage: Optional[UsageRef] = None,
attributes: Optional[RecordAttributesArg] = None,
) -> ProvEntity:
"""
Creates a new derivation record for this entity from a used entity.
:param usedEntity: Entity or a string identifier for the used entity.
:param activity: Activity or string identifier of the activity involved in
the derivation (default: None).
:param generation: Optional generation record to state qualified derivation
through an internal generation (default: None).
:param usage: Optional usage record to state qualified derivation through
an internal usage (default: None).
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.derivation(
self, usedEntity, activity, generation, usage, other_attributes=attributes
)
return self
[docs]
def wasAttributedTo(
self, agent: AgentRef, attributes: Optional[RecordAttributesArg] = None
) -> ProvEntity:
"""
Creates a new attribution record between this entity and an agent.
:param agent: Agent or string identifier of the agent involved in the
attribution.
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.attribution(self, agent, other_attributes=attributes)
return self
[docs]
def alternateOf(self, alternate2: EntityRef) -> ProvEntity:
"""
Creates a new alternate record between this and another entity.
:param alternate2: Entity or a string identifier for the second entity.
"""
self._bundle.alternate(self, alternate2)
return self
[docs]
def specializationOf(self, generalEntity: EntityRef) -> ProvEntity:
"""
Creates a new specialisation record for this from a general entity.
:param generalEntity: Entity or a string identifier for the general entity.
"""
self._bundle.specialization(self, generalEntity)
return self
[docs]
def hadMember(self, entity: EntityRef) -> ProvEntity:
"""
Creates a new membership record to an entity for a collection.
:param entity: Entity to be added to the collection.
"""
self._bundle.membership(self, entity)
return self
[docs]
class ProvActivity(ProvElement):
"""Provenance Activity element."""
FORMAL_ATTRIBUTES = (PROV_ATTR_STARTTIME, PROV_ATTR_ENDTIME)
_prov_type = PROV_ACTIVITY
# Convenient methods
[docs]
def set_time(
self,
startTime: Optional[datetime.datetime] = None,
endTime: Optional[datetime.datetime] = None,
) -> None:
"""
Sets the time this activity took place.
:param startTime: Start time for the activity.
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param endTime: Start time for the activity.
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
"""
if startTime is not None:
self._attributes[PROV_ATTR_STARTTIME] = {startTime}
if endTime is not None:
self._attributes[PROV_ATTR_ENDTIME] = {endTime}
[docs]
def get_startTime(self) -> datetime.datetime | None:
"""
Returns the time the activity started.
:return: :py:class:`datetime.datetime`
"""
values = self._attributes[PROV_ATTR_STARTTIME]
return first(values) if values else None
[docs]
def get_endTime(self) -> datetime.datetime | None:
"""
Returns the time the activity ended.
:return: :py:class:`datetime.datetime`
"""
values = self._attributes[PROV_ATTR_ENDTIME]
return first(values) if values else None
# Convenient assertions that take the current ProvActivity as the first
# (formal) argument
[docs]
def used(
self,
entity: EntityRef,
time: Optional[DatetimeOrStr] = None,
attributes: Optional[RecordAttributesArg] = None,
) -> ProvActivity:
"""
Creates a new usage record for this activity.
:param entity: Entity or string identifier of the entity involved in
the usage relationship (default: None).
:param time: Optional time for the usage (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.usage(self, entity, time, other_attributes=attributes)
return self
[docs]
def wasStartedBy(
self,
trigger: Optional[EntityRef],
starter: Optional[ActivityRef] = None,
time: Optional[DatetimeOrStr] = None,
attributes: Optional[RecordAttributesArg] = None,
) -> ProvActivity:
"""
Creates a new start record for this activity. The activity did not exist
before the start by the trigger.
:param trigger: Entity triggering the start of this activity.
:param starter: Optional extra activity to state a qualified start
through which the trigger entity for the start is generated
(default: None).
:param time: Optional time for the start (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.start(self, trigger, starter, time, other_attributes=attributes)
return self
[docs]
def wasEndedBy(
self,
trigger: Optional[EntityRef],
ender: Optional[ActivityRef] = None,
time: Optional[DatetimeOrStr] = None,
attributes: Optional[RecordAttributesArg] = None,
) -> ProvActivity:
"""
Creates a new end record for this activity.
:param trigger: Entity triggering the end of this activity.
:param ender: Optionally extra activity to state a qualified end through
which the trigger entity for the end is generated (default: None).
:param time: Optional time for the end (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.end(self, trigger, ender, time, other_attributes=attributes)
return self
[docs]
def wasAssociatedWith(
self,
agent: AgentRef,
plan: Optional[EntityRef] = None,
attributes: Optional[RecordAttributesArg] = None,
) -> ProvActivity:
"""
Creates a new association record for this activity.
:param agent: Agent or string identifier of the agent involved in the
association (default: None).
:param plan: Optionally extra entity to state qualified association through
an internal plan (default: None).
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.association(self, agent, plan, other_attributes=attributes)
return self
[docs]
class ProvGeneration(ProvRelation):
"""Provenance Generation relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_ENTITY, PROV_ATTR_ACTIVITY, PROV_ATTR_TIME)
_prov_type = PROV_GENERATION
[docs]
class ProvUsage(ProvRelation):
"""Provenance Usage relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_ACTIVITY, PROV_ATTR_ENTITY, PROV_ATTR_TIME)
_prov_type = PROV_USAGE
[docs]
class ProvCommunication(ProvRelation):
"""Provenance Communication relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_INFORMED, PROV_ATTR_INFORMANT)
_prov_type = PROV_COMMUNICATION
[docs]
class ProvStart(ProvRelation):
"""Provenance Start relationship."""
FORMAL_ATTRIBUTES = (
PROV_ATTR_ACTIVITY,
PROV_ATTR_TRIGGER,
PROV_ATTR_STARTER,
PROV_ATTR_TIME,
)
_prov_type = PROV_START
[docs]
class ProvEnd(ProvRelation):
"""Provenance End relationship."""
FORMAL_ATTRIBUTES = (
PROV_ATTR_ACTIVITY,
PROV_ATTR_TRIGGER,
PROV_ATTR_ENDER,
PROV_ATTR_TIME,
)
_prov_type = PROV_END
[docs]
class ProvInvalidation(ProvRelation):
"""Provenance Invalidation relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_ENTITY, PROV_ATTR_ACTIVITY, PROV_ATTR_TIME)
_prov_type = PROV_INVALIDATION
# Component 2: Derivations
[docs]
class ProvDerivation(ProvRelation):
"""Provenance Derivation relationship."""
FORMAL_ATTRIBUTES = (
PROV_ATTR_GENERATED_ENTITY,
PROV_ATTR_USED_ENTITY,
PROV_ATTR_ACTIVITY,
PROV_ATTR_GENERATION,
PROV_ATTR_USAGE,
)
_prov_type = PROV_DERIVATION
# Component 3: Agents, Responsibility, and Influence
[docs]
class ProvAgent(ProvElement):
"""Provenance Agent element."""
_prov_type = PROV_AGENT
# Convenient assertions that take the current ProvAgent as the first
# (formal) argument
[docs]
def actedOnBehalfOf(
self,
responsible: AgentRef,
activity: Optional[ActivityRef] = None,
attributes: Optional[RecordAttributesArg] = None,
) -> ProvAgent:
"""
Creates a new delegation record on behalf of this agent.
:param responsible: Agent the responsibility is delegated to.
:param activity: Optionally extra activity to state qualified delegation
internally (default: None).
:param attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
self._bundle.delegation(
self, responsible, activity, other_attributes=attributes
)
return self
[docs]
class ProvAttribution(ProvRelation):
"""Provenance Attribution relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_ENTITY, PROV_ATTR_AGENT)
_prov_type = PROV_ATTRIBUTION
[docs]
class ProvAssociation(ProvRelation):
"""Provenance Association relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_ACTIVITY, PROV_ATTR_AGENT, PROV_ATTR_PLAN)
_prov_type = PROV_ASSOCIATION
[docs]
class ProvDelegation(ProvRelation):
"""Provenance Delegation relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_DELEGATE, PROV_ATTR_RESPONSIBLE, PROV_ATTR_ACTIVITY)
_prov_type = PROV_DELEGATION
[docs]
class ProvInfluence(ProvRelation):
"""Provenance Influence relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_INFLUENCEE, PROV_ATTR_INFLUENCER)
_prov_type = PROV_INFLUENCE
# Component 5: Alternate Entities
[docs]
class ProvSpecialization(ProvRelation):
"""Provenance Specialization relationship."""
FORMAL_ATTRIBUTES = (
PROV_ATTR_SPECIFIC_ENTITY,
PROV_ATTR_GENERAL_ENTITY,
) # type: tuple[QualifiedName, ...]
_prov_type = PROV_SPECIALIZATION
[docs]
class ProvAlternate(ProvRelation):
"""Provenance Alternate relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_ALTERNATE1, PROV_ATTR_ALTERNATE2)
_prov_type = PROV_ALTERNATE
[docs]
class ProvMention(ProvSpecialization):
"""Provenance Mention relationship (specific Specialization)."""
FORMAL_ATTRIBUTES = (
PROV_ATTR_SPECIFIC_ENTITY,
PROV_ATTR_GENERAL_ENTITY,
PROV_ATTR_BUNDLE,
)
_prov_type = PROV_MENTION
# Component 6: Collections
[docs]
class ProvMembership(ProvRelation):
"""Provenance Membership relationship."""
FORMAL_ATTRIBUTES = (PROV_ATTR_COLLECTION, PROV_ATTR_ENTITY)
_prov_type = PROV_MEMBERSHIP
# Class mappings from PROV record type
PROV_REC_CLS = {
PROV_ENTITY: ProvEntity,
PROV_ACTIVITY: ProvActivity,
PROV_GENERATION: ProvGeneration,
PROV_USAGE: ProvUsage,
PROV_COMMUNICATION: ProvCommunication,
PROV_START: ProvStart,
PROV_END: ProvEnd,
PROV_INVALIDATION: ProvInvalidation,
PROV_DERIVATION: ProvDerivation,
PROV_AGENT: ProvAgent,
PROV_ATTRIBUTION: ProvAttribution,
PROV_ASSOCIATION: ProvAssociation,
PROV_DELEGATION: ProvDelegation,
PROV_INFLUENCE: ProvInfluence,
PROV_SPECIALIZATION: ProvSpecialization,
PROV_ALTERNATE: ProvAlternate,
PROV_MENTION: ProvMention,
PROV_MEMBERSHIP: ProvMembership,
}
DEFAULT_NAMESPACES = {"prov": PROV, "xsd": XSD, "xsi": XSI}
# Bundle
[docs]
class NamespaceManager(dict):
"""Manages namespaces for PROV documents and bundles."""
parent = None # type: Optional[NamespaceManager]
"""Parent :py:class:`NamespaceManager` this manager one is a child of."""
def __init__(
self,
namespaces: Optional[NSCollection] = None,
default: Optional[str] = None,
parent: Optional[NamespaceManager] = None,
):
"""
Constructor.
:param namespaces: Optional namespaces to add to the manager
(default: None).
:param default: Optional default namespace to use (default: None).
:param parent: Optional parent :py:class:`NamespaceManager` to make this
namespace manager a child of (default: None).
"""
dict.__init__(self)
self._default_namespaces = DEFAULT_NAMESPACES
self.update(self._default_namespaces)
self._namespaces = {} # type: dict[str, Namespace]
if default is not None:
self.set_default_namespace(default)
else:
self._default = None # type: Optional[Namespace]
self.parent = parent
# TODO check if default is in the default namespaces
self._anon_id_count = 0
self._uri_map = dict() # type: dict[str, Namespace]
self._rename_map = dict() # type: dict[Namespace, Namespace]
self._prefix_renamed_map = dict() # type: dict[str, Namespace]
if namespaces is not None:
self.add_namespaces(namespaces)
[docs]
def get_namespace(self, uri: str) -> Namespace | None:
"""
Returns the namespace prefix for the given URI.
:param uri: Namespace URI.
:return: :py:class:`~prov.identifier.Namespace`.
"""
for namespace in self.values():
if uri == namespace._uri:
return namespace
return None
[docs]
def get_registered_namespaces(self) -> Iterable[Namespace]:
"""
Returns all registered namespaces.
:return: Iterable of :py:class:`~prov.identifier.Namespace`.
"""
return self._namespaces.values()
[docs]
def set_default_namespace(self, uri: str) -> None:
"""
Sets the default namespace to the one of a given URI.
:param uri: Namespace URI.
"""
self._default = Namespace("", uri)
self[""] = self._default
[docs]
def get_default_namespace(self) -> Namespace | None:
"""
Returns the default namespace.
:return: :py:class:`~prov.identifier.Namespace`
"""
return self._default
[docs]
def add_namespace(self, namespace: Namespace) -> Namespace:
"""
Adds a namespace (if not available, yet).
:param namespace: :py:class:`~prov.identifier.Namespace` to add.
"""
if namespace in self.values():
# no need to do anything
return namespace
if namespace in self._rename_map:
# already renamed and added
return self._rename_map[namespace]
# Checking if the URI has been defined and use the existing namespace
# instead
uri = namespace.uri
prefix = namespace.prefix
if uri in self._uri_map:
existing_ns = self._uri_map[uri]
self._rename_map[namespace] = existing_ns
self._prefix_renamed_map[prefix] = existing_ns
return existing_ns
if prefix in self:
# Conflicting prefix
new_prefix = self._get_unused_prefix(prefix)
new_namespace = Namespace(new_prefix, namespace.uri)
self._rename_map[namespace] = new_namespace
# TODO: What if the prefix is already in the map and point to a
# different Namespace? Raise an exception?
self._prefix_renamed_map[prefix] = new_namespace
prefix = new_prefix
namespace = new_namespace
# Only now add the namespace to the registry
self._namespaces[prefix] = namespace
self[prefix] = namespace
self._uri_map[uri] = namespace
return namespace
[docs]
def add_namespaces(self, namespaces: NSCollection) -> None:
"""
Add multiple namespaces into this manager.
:param namespaces: A collection of namespace(s) to add.
:type namespaces: List of :py:class:`~prov.identifier.Namespace` or
dict of {prefix: uri}.
:returns: None
"""
if isinstance(namespaces, dict):
# expecting a dictionary of {prefix: uri},
# convert it to a list of Namespace
namespaces = [Namespace(prefix, uri) for prefix, uri in namespaces.items()]
if namespaces:
for ns in namespaces:
self.add_namespace(ns)
[docs]
def valid_qualified_name(
self, qname: QualifiedNameCandidate
) -> QualifiedName | None:
"""
Resolves an identifier to a valid qualified name.
:param qname: Qualified name as :py:class:`~prov.identifier.QualifiedName`
or a tuple (namespace, identifier).
:return: :py:class:`~prov.identifier.QualifiedName` or None in case of
failure.
"""
if not qname:
return None
if isinstance(qname, QualifiedName):
# Register the namespace if it has not been registered before
namespace = qname.namespace
prefix = namespace.prefix
local_part = qname.localpart
if not prefix:
# the namespace is a default namespace
if self._default == namespace:
# the same default namespace is defined
new_qname = self._default[local_part]
elif self._default is None:
# no default namespace is defined, reused the one given
self._default = namespace
return qname # no change, return the original
else:
# different default namespace,
# use the 'dn' prefix for the new namespace
dn_namespace = Namespace("dn", namespace.uri)
dn_namespace = self.add_namespace(dn_namespace)
new_qname = dn_namespace[local_part]
elif prefix in self and self[prefix] == namespace:
# No need to add the namespace
existing_ns = self[prefix]
if existing_ns is namespace:
return qname
else:
# reuse the existing namespace
new_qname = existing_ns[local_part]
else:
# Do not reuse the namespace object, making an identical copy
ns = self.add_namespace(Namespace(namespace.prefix, namespace.uri))
# minting the same Qualified Name from the namespace's copy
new_qname = ns[qname.localpart]
# returning the new qname
return new_qname
# Trying to generate a valid qualified name from here
if not isinstance(qname, (str, Identifier)):
# Only proceed with a string or URI value
return None
# Extract the URI string value if it is an identifier
str_value = qname.uri if isinstance(qname, Identifier) else qname
if str_value.startswith("_:"):
# this is a blank node ID
return None
elif ":" in str_value:
# check if the identifier contains a registered prefix
prefix, local_part = str_value.split(":", 1)
if prefix in self:
# return a new QualifiedName
return self[prefix][local_part]
if prefix in self._prefix_renamed_map:
# return a new QualifiedName
return self._prefix_renamed_map[prefix][local_part]
else:
# assuming it is a URI (with the first part as its scheme)
# check if the URI can be compacted by any of the registered namespaces
for namespace in self.values():
if str_value.startswith(namespace.uri):
# create a QName with the namespace
return namespace[str_value.replace(namespace.uri, "")]
elif self._default and isinstance(qname, str):
# no colon in the identifier and a default namespace is defined,
# create and return a qualified name in the default namespace
return self._default[qname]
if self.parent:
# all attempts have failed so far
# now delegate this to the parent NamespaceManager
return self.parent.valid_qualified_name(qname)
# Default to FAIL
return None
[docs]
def get_anonymous_identifier(self, local_prefix: str = "id") -> Identifier:
"""
Returns an anonymous identifier (without a namespace prefix).
:param local_prefix: Optional local namespace prefix as a string
(default: 'id').
:return: :py:class:`~prov.identifier.Identifier`
"""
self._anon_id_count += 1
return Identifier("_:%s%d" % (local_prefix, self._anon_id_count))
def _get_unused_prefix(self, original_prefix: str) -> str:
if original_prefix not in self:
return original_prefix
count = 1
while True:
new_prefix = "_".join((original_prefix, str(count)))
if new_prefix in self:
count += 1
else:
return new_prefix
[docs]
class ProvBundle(object):
"""PROV Bundle"""
def __init__(
self,
records: Optional[Iterable[ProvRecord]] = None,
identifier: Optional[QualifiedName] = None,
namespaces: Optional[NSCollection] = None,
document: Optional["ProvDocument"] = None,
):
"""
Constructor.
:param records: Optional iterable of records to add to the bundle
(default: None).
:param identifier: Optional identifier of the bundle (default: None).
:param namespaces: Optional iterable of :py:class:`~prov.identifier.Namespace`s
to set the document up with (default: None).
:param document: Optional document to add to the bundle (default: None).
"""
# Initializing bundle-specific attributes
self._identifier = identifier
self._records = list() # type: list[ProvRecord]
self._id_map = defaultdict(list) # type: dict[QualifiedName, list[ProvRecord]]
self._document = document
self._namespaces = NamespaceManager(
namespaces, parent=(document._namespaces if document is not None else None)
) # type: NamespaceManager
if records:
for record in records:
self.add_record(record)
def __repr__(self) -> str:
return "<%s: %s>" % (self.__class__.__name__, self._identifier)
@property
def namespaces(self) -> set[Namespace]:
"""
Returns the set of registered namespaces.
:return: Set of :py:class:`~prov.identifier.Namespace`.
"""
return set(self._namespaces.get_registered_namespaces())
@property
def default_ns_uri(self) -> str | None:
"""
Returns the default namespace's URI, if any.
:return: URI as string.
"""
default_ns = self._namespaces.get_default_namespace()
return default_ns.uri if default_ns else None
@property
def document(self) -> ProvDocument | None:
"""
Returns the parent document, if any.
:return: :py:class:`ProvDocument`.
"""
return self._document
@property
def identifier(self) -> QualifiedName | None:
"""
Returns the bundle's identifier
"""
return self._identifier
@property
def records(self) -> list[ProvRecord]:
"""
Returns the list of all records in the current bundle
"""
return list(self._records)
# Bundle configurations
[docs]
def set_default_namespace(self, uri: str) -> None:
"""
Sets the default namespace through a given URI.
:param uri: Namespace URI.
"""
self._namespaces.set_default_namespace(uri)
[docs]
def get_default_namespace(self) -> Namespace | None:
"""
Returns the default namespace.
:return: :py:class:`~prov.identifier.Namespace`
"""
return self._namespaces.get_default_namespace()
[docs]
def add_namespace(
self, namespace_or_prefix: Namespace | str, uri: Optional[str] = None
) -> Namespace:
"""
Adds a namespace (if not available, yet).
:param namespace_or_prefix: :py:class:`~prov.identifier.Namespace` or its
prefix as a string to add.
:param uri: Namespace URI (default: None). Must be present if only a
prefix is given in the previous parameter.
"""
if isinstance(namespace_or_prefix, Namespace):
return self._namespaces.add_namespace(namespace_or_prefix)
else:
if uri is not None:
return self._namespaces.add_namespace(
Namespace(namespace_or_prefix, uri)
)
else:
raise ProvException("Cannot add a namespace without a URI")
[docs]
def get_registered_namespaces(self) -> Iterable[Namespace]:
"""
Returns all registered namespaces.
:return: Iterable of :py:class:`~prov.identifier.Namespace`.
"""
return self._namespaces.get_registered_namespaces()
[docs]
def valid_qualified_name(
self, identifier: QualifiedNameCandidate
) -> Optional[QualifiedName]:
return self._namespaces.valid_qualified_name(identifier)
[docs]
def mandatory_valid_qname(
self, identifier: QualifiedNameCandidate
) -> QualifiedName:
"""
Determines if the given identifier is a valid qualified name and returns it.
If the provided identifier is not valid, an exception is raised.
"""
valid_qname = self.valid_qualified_name(identifier)
if valid_qname is not None:
return valid_qname
else:
raise ProvExceptionInvalidQualifiedName(identifier)
[docs]
def get_records(
self, class_or_type_or_tuple: Optional[type | tuple[type]] = None
) -> Iterable[ProvRecord]:
"""
Returns all records. Returned records may be filtered by the optional
argument.
:param class_or_type_or_tuple: A filter on the type for which records are
to be returned (default: None). The filter checks by the type of the
record using the `isinstance` check on the record.
:return: List of :py:class:`ProvRecord` objects.
"""
results = list(self._records) # make a (shallow) copy of the record list
if class_or_type_or_tuple:
return filter(lambda rec: isinstance(rec, class_or_type_or_tuple), results)
else:
return results
[docs]
def get_record(self, identifier: QualifiedNameCandidate) -> list[ProvRecord]:
"""
Returns one or more records matching a given identifier.
:param identifier: Record identifier.
:return: List of :py:class:`ProvRecord`
"""
valid_id = self.valid_qualified_name(identifier)
return list(self._id_map[valid_id]) if valid_id is not None else []
# Miscellaneous functions
[docs]
def is_document(self) -> bool:
"""
`True` if the object is a document, `False` otherwise.
:return: bool
"""
return False
[docs]
def is_bundle(self) -> bool:
"""
`True` if the object is a bundle, `False` otherwise.
:return: bool
"""
return True
[docs]
def has_bundles(self) -> bool:
"""
`True` if the object has at least one bundle, `False` otherwise.
:return: bool
"""
return False
@property
def bundles(self) -> Iterable[ProvBundle]:
"""
Returns bundles contained in the document
:return: Iterable of :py:class:`ProvBundle`.
"""
raise ProvException("A PROV bundle does not contain sub-bundles")
[docs]
def get_provn(self, _indent_level: int = 0) -> str:
"""
Returns the PROV-N representation of the bundle.
:return: String
"""
indentation = "" + (" " * _indent_level)
newline = "\n" + (" " * (_indent_level + 1))
# if this is the document, start the document;
# otherwise, start the bundle
lines = ["document"] if self.is_document() else ["bundle %s" % self._identifier]
default_namespace = self._namespaces.get_default_namespace()
if default_namespace:
lines.append("default <%s>" % default_namespace.uri)
registered_namespaces = self._namespaces.get_registered_namespaces()
if registered_namespaces:
lines.extend(
[
"prefix %s <%s>" % (namespace.prefix, namespace.uri)
for namespace in registered_namespaces
]
)
if default_namespace or registered_namespaces:
# a blank line between the prefixes and the assertions
lines.append("")
# adding all the records
lines.extend([record.get_provn() for record in self._records])
if self.is_document():
# Print out bundles
lines.extend(bundle.get_provn(_indent_level + 1) for bundle in self.bundles)
provn_str = newline.join(lines) + "\n"
# closing the structure
provn_str += indentation + (
"endDocument" if self.is_document() else "endBundle"
)
return provn_str
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ProvBundle):
return False
other_records = set(other.get_records())
this_records = set(self.get_records())
if len(this_records) != len(other_records):
return False
# check if all records for equality
for record_a in this_records:
# Manually look for the record
found = False
for record_b in other_records:
if record_a == record_b:
other_records.remove(record_b)
found = True
break
if not found:
logger.debug(
"Equality (ProvBundle): Could not find this record: %s",
str(record_a),
)
return False
return True
def __ne__(self, other: Any) -> bool:
return not (self == other)
__hash__ = None # type: ignore
# type: ignore # type: ignore # Transformations
def _unified_records(self) -> list[ProvRecord]:
"""Returns a list of unified records."""
# TODO: Check unification rules in the PROV-CONSTRAINTS document
# This method simply merges the records having the same name
merged_records = dict()
for identifier, records in self._id_map.items():
if len(records) > 1:
# more than one record having the same identifier
# merge the records
merged = records[0].copy()
for record in records[1:]:
merged.add_attributes(record.attributes)
# map all of them to the merged record
for record in records:
merged_records[record] = merged
if not merged_records:
# No merging done, just return the list of original records
return list(self._records)
added_merged_records = set()
unified_records = list()
for record in self._records:
if record in merged_records:
merged = merged_records[record]
if merged not in added_merged_records:
unified_records.append(merged)
added_merged_records.add(merged)
else:
# add the original record
unified_records.append(record)
return unified_records
[docs]
def unified(self) -> ProvBundle:
"""
Unifies all records in the bundle that haves same identifiers
:returns: :py:class:`ProvBundle` -- the new unified bundle.
"""
unified_records = self._unified_records()
bundle = ProvBundle(records=unified_records, identifier=self.identifier)
return bundle
[docs]
def update(self, other: ProvBundle) -> None:
"""
Append all the records of the *other* ProvBundle into this bundle.
:param other: the other bundle whose records to be appended.
:type other: :py:class:`ProvBundle`
:returns: None.
"""
if isinstance(other, ProvBundle):
if other.is_document() and other.has_bundles():
# Cannot add bundles to a bundle
raise ProvException(
"ProvBundle.update(): The other bundle is a document with "
"sub-bundle(s)."
)
for record in other.get_records():
self.add_record(record)
else:
raise ProvException(
"ProvBundle.update(): The other bundle is not a ProvBundle "
"instance (%s)" % type(other)
)
# Provenance statements
def _add_record(self, record: ProvRecord) -> None:
# IMPORTANT: All records need to be added to a bundle/document via this
# method. Otherwise, the _id_map dict will not be correctly updated
identifier = record.identifier
if identifier is not None:
self._id_map[identifier].append(record)
self._records.append(record)
[docs]
def new_record(
self,
record_type: QualifiedName,
identifier: OptionalID,
attributes: Optional[RecordAttributesArg] = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvRecord:
"""
Creates a new record.
:param record_type: Type of record (one of :py:const:`PROV_REC_CLS`).
:param identifier: Identifier for new record.
:param attributes: Attributes as a dictionary or list of tuples to be added
to the record optionally (default: None).
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attr_list = [] # type: list[tuple[QualifiedNameCandidate, Any]]
if attributes:
if isinstance(attributes, dict):
attr_list.extend((attr, value) for attr, value in attributes.items())
else:
# expecting a list of attributes here
attr_list.extend(attributes)
if other_attributes:
attr_list.extend(
other_attributes.items()
if isinstance(other_attributes, dict)
else other_attributes
)
record_identifier = (
self.valid_qualified_name(identifier) if identifier else None
)
new_record = PROV_REC_CLS[record_type](self, record_identifier, attr_list)
self._add_record(new_record)
return new_record
[docs]
def add_record(self, record: ProvRecord) -> ProvRecord:
"""
Adds a new record that to the bundle.
:param record: :py:class:`ProvRecord` to be added.
"""
return self.new_record(
record.get_type(),
record.identifier,
record.formal_attributes,
record.extra_attributes,
)
[docs]
def entity(
self,
identifier: QualifiedNameCandidate,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvEntity:
"""
Creates a new entity.
:param identifier: Identifier for new entity.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
return self.new_record(PROV_ENTITY, identifier, None, other_attributes) # type: ignore
[docs]
def activity(
self,
identifier: QualifiedNameCandidate,
startTime: Optional[DatetimeOrStr] = None,
endTime: Optional[DatetimeOrStr] = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvActivity:
"""
Creates a new activity.
:param identifier: Identifier for new activity.
:param startTime: Optional start time for the activity (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param endTime: Optional start time for the activity (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_STARTTIME: _ensure_datetime(startTime),
PROV_ATTR_ENDTIME: _ensure_datetime(endTime),
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_ACTIVITY,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def generation(
self,
entity: EntityRef,
activity: Optional[ActivityRef] = None,
time: Optional[DatetimeOrStr] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvRecord:
"""
Creates a new generation record for an entity.
:param entity: Entity or a string identifier for the entity.
:param activity: Activity or string identifier of the activity involved in
the generation (default: None).
:param time: Optional time for the generation (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param identifier: Identifier for new generation record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_ENTITY: entity,
PROV_ATTR_ACTIVITY: activity,
PROV_ATTR_TIME: _ensure_datetime(time),
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_GENERATION,
identifier,
attributes,
other_attributes,
)
[docs]
def usage(
self,
activity: ActivityRef,
entity: Optional[EntityRef] = None,
time: Optional[DatetimeOrStr] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvUsage:
"""
Creates a new usage record for an activity.
:param activity: Activity or a string identifier for the entity.
:param entity: Entity or string identifier of the entity involved in
the usage relationship (default: None).
:param time: Optional time for the usage (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param identifier: Identifier for new usage record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_ACTIVITY: activity,
PROV_ATTR_ENTITY: entity,
PROV_ATTR_TIME: _ensure_datetime(time),
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_USAGE,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def start(
self,
activity: ActivityRef,
trigger: Optional[EntityRef] = None,
starter: Optional[ActivityRef] = None,
time: Optional[DatetimeOrStr] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvStart:
"""
Creates a new start record for an activity.
:param activity: Activity or a string identifier for the entity.
:param trigger: Entity triggering the start of this activity.
:param starter: Optionally extra activity to state a qualified start
through which the trigger entity for the start is generated
(default: None).
:param time: Optional time for the start (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param identifier: Identifier for new start record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_ACTIVITY: activity,
PROV_ATTR_TRIGGER: trigger,
PROV_ATTR_STARTER: starter,
PROV_ATTR_TIME: _ensure_datetime(time),
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_START,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def end(
self,
activity: ActivityRef,
trigger: Optional[EntityRef] = None,
ender: Optional[ActivityRef] = None,
time: Optional[DatetimeOrStr] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvEnd:
"""
Creates a new end record for an activity.
:param activity: Activity or a string identifier for the entity.
:param trigger: trigger: Entity triggering the end of this activity.
:param ender: Optionally extra activity to state a qualified end
through which the trigger entity for the end is generated
(default: None).
:param time: Optional time for the end (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param identifier: Identifier for new end record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_ACTIVITY: activity,
PROV_ATTR_TRIGGER: trigger,
PROV_ATTR_ENDER: ender,
PROV_ATTR_TIME: _ensure_datetime(time),
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_END,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def invalidation(
self,
entity: EntityRef,
activity: Optional[ActivityRef] = None,
time: Optional[DatetimeOrStr] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvInvalidation:
"""
Creates a new invalidation record for an entity.
:param entity: Entity or a string identifier for the entity.
:param activity: Activity or string identifier of the activity involved in
the invalidation (default: None).
:param time: Optional time for the invalidation (default: None).
Either a :py:class:`datetime.datetime` object or a string that can be
parsed by :py:func:`dateutil.parser`.
:param identifier: Identifier for the new invalidation record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_ENTITY: entity,
PROV_ATTR_ACTIVITY: activity,
PROV_ATTR_TIME: _ensure_datetime(time),
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_INVALIDATION,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def communication(
self,
informed: ActivityRef,
informant: ActivityRef,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvCommunication:
"""
Creates a new communication record for an entity.
:param informed: The informed activity (relationship destination).
:param informant: The informing activity (relationship source).
:param identifier: Identifier for new communication record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_INFORMED: informed,
PROV_ATTR_INFORMANT: informant,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_COMMUNICATION,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def agent(
self,
identifier: QualifiedNameCandidate,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvAgent:
"""
Creates a new agent.
:param identifier: Identifier for new agent.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
return self.new_record(PROV_AGENT, identifier, None, other_attributes) # type: ignore
[docs]
def attribution(
self,
entity: EntityRef,
agent: AgentRef,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvAttribution:
"""
Creates a new attribution record between an entity and an agent.
:param entity: Entity or a string identifier for the entity (relationship
source).
:param agent: Agent or string identifier of the agent involved in the
attribution (relationship destination).
:param identifier: Identifier for new attribution record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_ENTITY: entity,
PROV_ATTR_AGENT: agent,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_ATTRIBUTION,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def association(
self,
activity: ActivityRef,
agent: Optional[AgentRef] = None,
plan: Optional[EntityRef] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvAssociation:
"""
Creates a new association record for an activity.
:param activity: Activity or a string identifier for the activity.
:param agent: Agent or string identifier of the agent involved in the
association (default: None).
:param plan: Optionally extra entity to state qualified association through
an internal plan (default: None).
:param identifier: Identifier for new association record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_ACTIVITY: activity,
PROV_ATTR_AGENT: agent,
PROV_ATTR_PLAN: plan,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_ASSOCIATION,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def delegation(
self,
delegate: AgentRef,
responsible: AgentRef,
activity: Optional[ActivityRef] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvDelegation:
"""
Creates a new delegation record on behalf of an agent.
:param delegate: Agent delegating the responsibility (relationship source).
:param responsible: Agent the responsibility is delegated to (relationship
destination).
:param activity: Optionally extra activity to state qualified delegation
internally (default: None).
:param identifier: Identifier for new association record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_DELEGATE: delegate,
PROV_ATTR_RESPONSIBLE: responsible,
PROV_ATTR_ACTIVITY: activity,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_DELEGATION,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def influence(
self,
influencee: EntityRef | ActivityRef | AgentRef,
influencer: EntityRef | ActivityRef | AgentRef,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvInfluence:
"""
Creates a new influence record between two entities, activities or agents.
:param influencee: Influenced entity, activity or agent (relationship
source).
:param influencer: Influencing entity, activity or agent (relationship
destination).
:param identifier: Identifier for new influence record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_INFLUENCEE: influencee,
PROV_ATTR_INFLUENCER: influencer,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_INFLUENCE,
identifier,
attributes,
other_attributes,
) # type: ignore
[docs]
def derivation(
self,
generatedEntity: EntityRef,
usedEntity: EntityRef,
activity: Optional[ActivityRef] = None,
generation: Optional[GenrationRef] = None,
usage: Optional[UsageRef] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvDerivation:
"""
Creates a new derivation record for a generated entity from a used entity.
:param generatedEntity: Entity or a string identifier for the generated
entity (relationship source).
:param usedEntity: Entity or a string identifier for the used entity
(relationship destination).
:param activity: Activity or string identifier of the activity involved in
the derivation (default: None).
:param generation: Optionally extra activity to state qualified generation
through a generation (default: None).
:param usage: XXX (default: None).
:param identifier: Identifier for new derivation record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
attributes = {
PROV_ATTR_GENERATED_ENTITY: generatedEntity,
PROV_ATTR_USED_ENTITY: usedEntity,
PROV_ATTR_ACTIVITY: activity,
PROV_ATTR_GENERATION: generation,
PROV_ATTR_USAGE: usage,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_DERIVATION, identifier, attributes, other_attributes
) # type: ignore
[docs]
def revision(
self,
generatedEntity: EntityRef,
usedEntity: EntityRef,
activity: Optional[ActivityRef] = None,
generation: Optional[GenrationRef] = None,
usage: Optional[UsageRef] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvDerivation:
"""
Creates a new revision record for a generated entity from a used entity.
:param generatedEntity: Entity or a string identifier for the generated
entity (relationship source).
:param usedEntity: Entity or a string identifier for the used entity
(relationship destination).
:param activity: Activity or string identifier of the activity involved in
the revision (default: None).
:param generation: Optionally to state qualified revision through a
generation activity (default: None).
:param usage: XXX (default: None).
:param identifier: Identifier for new revision record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
record = self.derivation(
generatedEntity,
usedEntity,
activity,
generation,
usage,
identifier,
other_attributes,
)
record.add_asserted_type(PROV["Revision"])
return record
[docs]
def quotation(
self,
generatedEntity: EntityRef,
usedEntity: EntityRef,
activity: Optional[ActivityRef] = None,
generation: Optional[GenrationRef] = None,
usage: Optional[UsageRef] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvDerivation:
"""
Creates a new quotation record for a generated entity from a used entity.
:param generatedEntity: Entity or a string identifier for the generated
entity (relationship source).
:param usedEntity: Entity or a string identifier for the used entity
(relationship destination).
:param activity: Activity or string identifier of the activity involved in
the quotation (default: None).
:param generation: Optionally to state qualified quotation through a
generation activity (default: None).
:param usage: XXX (default: None).
:param identifier: Identifier for new quotation record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
record = self.derivation(
generatedEntity,
usedEntity,
activity,
generation,
usage,
identifier,
other_attributes,
)
record.add_asserted_type(PROV["Quotation"])
return record
[docs]
def primary_source(
self,
generatedEntity: EntityRef,
usedEntity: EntityRef,
activity: Optional[ActivityRef] = None,
generation: Optional[GenrationRef] = None,
usage: Optional[UsageRef] = None,
identifier: OptionalID = None,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvDerivation:
"""
Creates a new primary source record for a generated entity from a used
entity.
:param generatedEntity: Entity or a string identifier for the generated
entity (relationship source).
:param usedEntity: Entity or a string identifier for the used entity
(relationship destination).
:param activity: Activity or string identifier of the activity involved in
the primary source (default: None).
:param generation: Optionally to state qualified primary source through a
generation activity (default: None).
:param usage: XXX (default: None).
:param identifier: Identifier for new primary source record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
record = self.derivation(
generatedEntity,
usedEntity,
activity,
generation,
usage,
identifier,
other_attributes,
)
record.add_asserted_type(PROV["PrimarySource"])
return record # type: ignore
[docs]
def specialization(
self, specificEntity: EntityRef, generalEntity: EntityRef
) -> ProvSpecialization:
"""
Creates a new specialisation record for a specific from a general entity.
:param specificEntity: Entity or a string identifier for the specific
entity (relationship source).
:param generalEntity: Entity or a string identifier for the general entity
(relationship destination).
"""
attributes = {
PROV_ATTR_SPECIFIC_ENTITY: specificEntity,
PROV_ATTR_GENERAL_ENTITY: generalEntity,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_SPECIALIZATION,
None,
attributes,
) # type: ignore
[docs]
def alternate(self, alternate1: EntityRef, alternate2: EntityRef) -> ProvAlternate:
"""
Creates a new alternate record between two entities.
:param alternate1: Entity or a string identifier for the first entity
(relationship source).
:param alternate2: Entity or a string identifier for the second entity
(relationship destination).
"""
attributes = {
PROV_ATTR_ALTERNATE1: alternate1,
PROV_ATTR_ALTERNATE2: alternate2,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_ALTERNATE,
None,
attributes,
) # type: ignore
[docs]
def mention(
self, specificEntity: EntityRef, generalEntity: EntityRef, bundle: EntityRef
) -> ProvMention:
"""
Creates a new mention record for a specific from a general entity.
:param specificEntity: Entity or a string identifier for the specific
entity (relationship source).
:param generalEntity: Entity or a string identifier for the general entity
(relationship destination).
:param bundle: XXX
"""
attributes = {
PROV_ATTR_SPECIFIC_ENTITY: specificEntity,
PROV_ATTR_GENERAL_ENTITY: generalEntity,
PROV_ATTR_BUNDLE: bundle,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_MENTION,
None,
attributes,
) # type: ignore
[docs]
def collection(
self,
identifier: QualifiedNameCandidate,
other_attributes: Optional[RecordAttributesArg] = None,
) -> ProvEntity:
"""
Creates a new collection record for a particular record.
:param identifier: Identifier for new collection record.
:param other_attributes: Optional other attributes as a dictionary or list
of tuples to be added to the record optionally (default: None).
"""
record = self.new_record(PROV_ENTITY, identifier, None, other_attributes)
record.add_asserted_type(PROV["Collection"])
return record # type: ignore
[docs]
def membership(self, collection: EntityRef, entity: EntityRef) -> ProvMembership:
"""
Creates a new membership record for an entity to a collection.
:param collection: Collection the entity is to be added to.
:param entity: Entity to be added to the collection.
"""
attributes = {
PROV_ATTR_COLLECTION: collection,
PROV_ATTR_ENTITY: entity,
} # type: dict[QualifiedNameCandidate, Any]
return self.new_record(
PROV_MEMBERSHIP,
None,
attributes,
) # type: ignore
[docs]
def plot(
self,
filename: Optional[PathLike] = None,
show_nary: bool = True,
use_labels: bool = False,
show_element_attributes: bool = True,
show_relation_attributes: bool = True,
) -> None:
"""
Convenience function to plot a PROV document.
:param filename: The filename to save to. If not given, it will open
an interactive matplotlib plot. The filetype is determined from
the filename ending.
:type filename: String
:param show_nary: Shows all elements in n-ary relations.
:type show_nary: bool
:param use_labels: Uses the `prov:label` property of an element as its
name (instead of its identifier).
:type use_labels: bool
:param show_element_attributes: Shows attributes of elements.
:type show_element_attributes: bool
:param show_relation_attributes: Shows attributes of relations.
:type show_relation_attributes: bool
"""
# Lazy imports to have soft dependencies on pydot and matplotlib
# (imported even later).
from prov import dot
if filename:
format = str(os.path.splitext(filename))[-1].lower().strip(os.path.extsep)
else:
format = "png"
format = format.lower()
d = dot.prov_to_dot(
self,
show_nary=show_nary,
use_labels=use_labels,
show_element_attributes=show_element_attributes,
show_relation_attributes=show_relation_attributes,
)
method = "create_%s" % format
if not hasattr(d, method):
raise ValueError("Format '%s' cannot be saved." % format)
with io.BytesIO() as buf:
buf.write(getattr(d, method)())
buf.seek(0, 0)
if filename:
with open(filename, "wb") as fh:
fh.write(buf.read())
else:
# Use matplotlib to show the image as it likely is more
# widespread than PIL and works nicely in the ipython notebook.
import matplotlib.pylab as plt # type: ignore
import matplotlib.image as mpimg # type: ignore
max_size = 30
img = mpimg.imread(buf)
# pydot makes a border around the image. remove it.
img = img[1:-1, 1:-1]
size = (img.shape[1] / 100.0, img.shape[0] / 100.0)
if max(size) > max_size:
scale = max_size / max(size)
else:
scale = 1.0
size = (scale * size[0], scale * size[1])
plt.figure(figsize=size)
plt.subplots_adjust(bottom=0, top=1, left=0, right=1)
plt.xticks([])
plt.yticks([])
plt.imshow(img)
plt.axis("off")
plt.show()
# Aliases
wasGeneratedBy = generation
used = usage
wasStartedBy = start
wasEndedBy = end
wasInvalidatedBy = invalidation
wasInformedBy = communication
wasAttributedTo = attribution
wasAssociatedWith = association
actedOnBehalfOf = delegation
wasInfluencedBy = influence
wasDerivedFrom = derivation
wasRevisionOf = revision
wasQuotedFrom = quotation
hadPrimarySource = primary_source
alternateOf = alternate
specializationOf = specialization
mentionOf = mention
hadMember = membership
[docs]
class ProvDocument(ProvBundle):
"""Provenance Document."""
def __init__(
self,
records: Optional[Iterable[ProvRecord]] = None,
namespaces: Optional[NSCollection] = None,
):
"""
Constructor.
:param records: Optional records to add to the document (default: None).
:param namespaces: Optional iterable of :py:class:`~prov.identifier.Namespace`s
to set the document up with (default: None).
"""
ProvBundle.__init__(
self, records=records, identifier=None, namespaces=namespaces
)
self._bundles = dict() # type: dict[QualifiedName, ProvBundle]
def __repr__(self) -> str:
return "<ProvDocument>"
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ProvDocument):
return False
# Comparing the documents' content
if not super(ProvDocument, self).__eq__(other):
return False
# Comparing the documents' bundles
for b_id, bundle in self._bundles.items():
if b_id not in other._bundles:
return False
other_bundle = other._bundles[b_id]
if bundle != other_bundle:
return False
# Everything is the same
return True
[docs]
def is_document(self) -> bool:
"""
`True` if the object is a document, `False` otherwise.
:return: bool
"""
return True
[docs]
def is_bundle(self) -> bool:
"""
`True` if the object is a bundle, `False` otherwise.
:return: bool
"""
return False
[docs]
def has_bundles(self) -> bool:
"""
`True` if the object has at least one bundle, `False` otherwise.
:return: bool
"""
return len(self._bundles) > 0
@property
def bundles(self) -> Iterable[ProvBundle]:
"""
Returns bundles contained in the document
:return: Iterable of :py:class:`ProvBundle`.
"""
return self._bundles.values()
# Transformations
[docs]
def flattened(self) -> ProvDocument:
"""
Flattens the document by moving all the records in its bundles up
to the document level.
:returns: :py:class:`ProvDocument` -- the (new) flattened document.
"""
if self._bundles:
# Creating a new document for all the records
new_doc = ProvDocument()
bundled_records = itertools.chain(
*[b.get_records() for b in self._bundles.values()]
)
for record in itertools.chain(self._records, bundled_records):
new_doc.add_record(record)
return new_doc
else:
# returning the same document
return self
[docs]
def unified(self) -> ProvDocument:
"""
Returns a new document containing all records having the same identifiers
unified (including those inside bundles).
:return: :py:class:`ProvDocument`
"""
document = ProvDocument(self._unified_records())
document._namespaces = self._namespaces
for bundle in self.bundles:
unified_bundle = bundle.unified()
document.add_bundle(unified_bundle)
return document
[docs]
def update(self, other: ProvBundle) -> None:
"""
Append all the records of the *other* document/bundle into this document.
Bundles having the same identifiers will be merged.
:param other: The other document/bundle whose records to be appended.
:type other: :py:class:`ProvDocument` or :py:class:`ProvBundle`
:returns: None.
"""
if isinstance(other, ProvBundle):
for record in other.get_records():
self.add_record(record)
if other.has_bundles():
for bundle in other.bundles:
bundle_id = bundle.identifier
assert bundle_id is not None
if bundle.identifier in self._bundles:
self._bundles[bundle.identifier].update(bundle)
else:
new_bundle = self.bundle(bundle_id)
new_bundle.update(bundle)
else:
raise ProvException(
"ProvDocument.update(): The other is not a ProvDocument or "
"ProvBundle instance (%s)" % type(other)
)
# Bundle operations
[docs]
def add_bundle(
self, bundle: ProvBundle, identifier: Optional[QualifiedName] = None
) -> None:
"""
Add a bundle to the current document.
:param bundle: The bundle to add to the document.
:type bundle: :py:class:`ProvBundle`
:param identifier: The (optional) identifier to use for the bundle
(default: None). If none given, use the identifier from the bundle
itself.
"""
if not isinstance(bundle, ProvBundle):
raise ProvException(
"Only a ProvBundle instance can be added as a bundle in a "
"ProvDocument."
)
if bundle.is_document():
if bundle.has_bundles():
raise ProvException(
"Cannot add a document with nested bundles as a bundle."
)
# Make it a new ProvBundle
new_bundle = ProvBundle(namespaces=bundle.namespaces)
new_bundle.update(bundle)
bundle = new_bundle
if identifier is None:
identifier = bundle.identifier
if not identifier:
raise ProvException("The provided bundle has no identifier")
# Link the bundle namespace manager to the document's
bundle._namespaces.parent = self._namespaces
valid_id = bundle.mandatory_valid_qname(identifier)
# IMPORTANT: Rewriting the bundle identifier for consistency
bundle._identifier = valid_id
if valid_id in self._bundles:
raise ProvException("A bundle with that identifier already exists")
self._bundles[valid_id] = bundle
bundle._document = self
[docs]
def bundle(self, identifier: QualifiedNameCandidate) -> ProvBundle:
"""
Returns a new bundle from the current document.
:param identifier: The identifier to use for the bundle.
:return: :py:class:`ProvBundle`
"""
if identifier is None:
raise ProvException(
"An identifier is required. Cannot create an unnamed bundle."
)
valid_id = self.valid_qualified_name(identifier)
if valid_id is None:
raise ProvException(
'The provided identifier "%s" is not valid' % identifier
)
if valid_id in self._bundles:
raise ProvException("A bundle with that identifier already exists")
b = ProvBundle(identifier=valid_id, document=self)
self._bundles[valid_id] = b
return b
# Serializing and deserializing
[docs]
def serialize(
self,
destination: Optional[io.IOBase | PathLike] = None,
format: str = "json",
**args: Any,
) -> str | None:
"""
Serialize the :py:class:`ProvDocument` to the destination.
Available serializers can be queried by the value of
`:py:attr:~prov.serializers.Registry.serializers` after loading them via
`:py:func:~prov.serializers.Registry.load_serializers()`.
:param destination: Stream object to serialize the output to. Default is
`None`, which serializes as a string.
:param format: Serialization format (default: 'json'), defaulting to
PROV-JSON.
:return: Serialization in a string if no destination was given,
None otherwise.
"""
serializer = serializers.get(format)(self)
if destination is None:
buffer = io.StringIO()
serializer.serialize(buffer, **args)
return buffer.getvalue()
if isinstance(destination, IOBase):
stream = destination
serializer.serialize(stream, **args)
else:
location = str(destination)
scheme, netloc, path, params, _query, fragment = urlparse(location)
if netloc != "":
print(
"WARNING: not saving as location " + "is not a local file reference"
)
return None
fd, name = tempfile.mkstemp()
stream = os.fdopen(fd, "wb")
serializer.serialize(stream, **args)
stream.close()
if hasattr(shutil, "move"):
shutil.move(name, path)
else:
shutil.copy(name, path)
os.remove(name)
return None
[docs]
@staticmethod
def deserialize(
source: Optional[io.IOBase | PathLike] = None,
content: Optional[str | bytes] = None,
format: str = "json",
**args: Any,
) -> ProvDocument:
"""
Deserialize the :py:class:`ProvDocument` from source (a stream or a
file path) or directly from a string content.
Available serializers can be queried by the value of
`:py:attr:~prov.serializers.Registry.serializers` after loading them via
`:py:func:~prov.serializers.Registry.load_serializers()`.
Note: Not all serializers support deserialization.
:param source: Stream object to deserialize the PROV document from
(default: None).
:param content: String to deserialize the PROV document from
(default: None).
:param format: Serialization format (default: 'json'), defaulting to
PROV-JSON.
:return: :py:class:`ProvDocument`
"""
serializer = serializers.get(format)()
if content is not None:
# io.StringIO only accepts unicode strings
stream = io.StringIO(
content if isinstance(content, str) else content.decode()
)
return serializer.deserialize(stream, **args)
if source is not None:
if isinstance(source, io.IOBase):
return serializer.deserialize(source, **args)
else:
with open(source) as f:
return serializer.deserialize(f, **args)
raise TypeError("Either source or content must be provided")
[docs]
def sorted_attributes(
element: QualifiedName, attributes: Iterable[NameValuePair]
) -> list[NameValuePair]:
"""
Helper function sorting attributes into the order required by PROV-XML.
:param element: The prov element used to derive the type and the
attribute order for the type.
:param attributes: The attributes to sort.
"""
attributes = list(attributes)
order = list(PROV_REC_CLS[element].FORMAL_ATTRIBUTES)
# Append label, location, role, type, and value attributes. This is
# universal amongst all elements.
order.extend([PROV_LABEL, PROV_LOCATION, PROV_ROLE, PROV_TYPE, PROV_VALUE])
# Sort function. The PROV XML specification talks about alphabetical
# sorting. We now interpret it as sorting by tag including the prefix
# first and then sorting by the text, also including the namespace
# prefix if given.
def sort_fct(x: NameValuePair) -> tuple[str, str]:
return str(x[0]), str(x[1].value if hasattr(x[1], "value") else x[1])
sorted_elements = []
for item in order:
this_type_list = []
for e in list(attributes):
if e[0] != item:
continue
this_type_list.append(e)
attributes.remove(e)
this_type_list.sort(key=sort_fct)
sorted_elements.extend(this_type_list)
# Add remaining attributes. According to the spec, the other attributes
# have a fixed alphabetical order.
attributes.sort(key=sort_fct)
sorted_elements.extend(attributes)
return sorted_elements