"""Module for representing UDS property annotations with support for raw and normalized formats.
This module provides classes for handling Universal Decompositional Semantics (UDS)
annotations in both raw (multi-annotator) and normalized (single-value) formats.
The main classes are:
- :class:`UDSAnnotation`: Abstract base class for all UDS annotations
- :class:`NormalizedUDSAnnotation`: Annotations with single normalized values and confidence scores
- :class:`RawUDSAnnotation`: Annotations preserving individual annotator responses
The module also provides:
- Type aliases for various annotation data structures (e.g., NodeAttributes, EdgeAttributes)
- Helper functions for working with nested defaultdicts
- Methods for loading annotations from JSON files and converting between formats
See Also
--------
decomp.semantics.uds.metadata : Metadata classes for UDS annotations
decomp.semantics.uds.graph : Graph structures for UDS annotations
"""
import json
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Iterator
from logging import warning
from os.path import basename, splitext
from typing import ClassVar, TextIO, cast, overload
from overrides import overrides
from .metadata import PrimitiveType, UDSAnnotationMetadata, UDSPropertyMetadata
from .types import AnnotatorValue as TypedAnnotatorValue
from .types import UDSSubspace
# type aliases for annotation data structures
type NodeAttributes = dict[str, dict[str, dict[str, PrimitiveType]]]
"""Node attributes: node_id -> subspace -> property -> value."""
type EdgeAttributes = dict[tuple[str, ...], dict[str, dict[str, PrimitiveType]]]
"""Edge attributes: (source_id, target_id) -> subspace -> property -> value."""
type GraphNodeAttributes = dict[str, NodeAttributes]
"""Mapping from graph IDs to their node attributes."""
type GraphEdgeAttributes = dict[str, EdgeAttributes]
"""Mapping from graph IDs to their edge attributes."""
type NormalizedData = dict[str, dict[str, dict[str, PrimitiveType]]]
"""Normalized annotation data: subspace -> property -> {'value': val, 'confidence': conf}."""
# type for raw annotation property data with the structure:
# value: {annotator_id: val}, confidence: {annotator_id: conf}
type RawPropertyData = dict[str, dict[str, PrimitiveType]]
"""Raw property data with per-annotator values and confidences."""
type RawData = dict[str, dict[str, dict[str, RawPropertyData]]]
"""Raw annotation data: subspace -> property -> RawPropertyData."""
# raw attribute types (for RawUDSAnnotation)
type RawNodeAttributes = dict[str, dict[str, dict[str, RawPropertyData]]]
"""Raw node attributes with multi-annotator data."""
type RawEdgeAttributes = dict[tuple[str, ...], dict[str, dict[str, RawPropertyData]]]
"""Raw edge attributes with multi-annotator data."""
type GraphRawNodeAttributes = dict[str, RawNodeAttributes]
"""Mapping from graph IDs to their raw node attributes."""
type GraphRawEdgeAttributes = dict[str, RawEdgeAttributes]
"""Mapping from graph IDs to their raw edge attributes."""
# type for the nested defaultdict used by annotator (5 levels deep)
# annotator_id -> graph_id -> node/edge_id -> subspace -> property -> {confidence: val, value: val}
# use AnnotatorValue from types module for consistency
AnnotatorValue = TypedAnnotatorValue
type NodeAnnotatorDict = dict[
str, dict[str, dict[str, dict[str, dict[str, AnnotatorValue]]]],
]
"""Nested dict for node annotations by annotator.
annotator -> graph -> node -> subspace -> property -> AnnotatorValue.
"""
type EdgeAnnotatorDict = dict[
str, dict[str, dict[tuple[str, ...], dict[str, dict[str, AnnotatorValue]]]],
]
"""Nested dict for edge annotations by annotator.
annotator -> graph -> edge -> subspace -> property -> AnnotatorValue.
"""
# complex return types for items() methods
type BaseItemsReturn = Iterator[
tuple[
str,
tuple[
dict[str, NormalizedData | RawData],
dict[tuple[str, ...], NormalizedData | RawData],
],
]
]
"""Return type for base items() method yielding (graph_id, (node_attrs, edge_attrs))."""
# raw items return type for annotator-specific items - more specific than base
# specific return types for different annotation access patterns
type NodeItemsReturn = Iterator[
tuple[str, dict[str, dict[str, dict[str, AnnotatorValue]]]]
]
type EdgeItemsReturn = Iterator[
tuple[str, dict[tuple[str, ...], dict[str, dict[str, AnnotatorValue]]]]
]
# union type for RawUDSAnnotation.items() method
type RawItemsReturn = NodeItemsReturn | EdgeItemsReturn | BaseItemsReturn
def _nested_defaultdict(depth: int) -> type[dict] | defaultdict:
"""Construct a nested defaultdict of specified depth.
The lowest nesting level (depth=0) is a normal dictionary.
Higher levels are defaultdicts that create nested structures.
Parameters
----------
depth : int
The depth of nesting. Must be non-negative.
Returns
-------
type[dict[str, AnnotatorValue]] | Callable[[], dict[str, AnnotatorValue]]
A dict constructor (depth=0) or defaultdict with nested structure
Raises
------
ValueError
If depth is negative
"""
if depth < 0:
raise ValueError("depth must be a nonnegative int")
if not depth:
return dict
return defaultdict(lambda: _nested_defaultdict(depth-1))
def _freeze_nested_defaultdict(d: dict | defaultdict) -> dict:
"""Convert nested defaultdict to regular dict recursively.
Parameters
----------
d : dict[str, NodeAnnotatorDict | EdgeAnnotatorDict | AnnotatorValue] | \
defaultdict[str, NodeAnnotatorDict | EdgeAnnotatorDict | AnnotatorValue]
The nested defaultdict to freeze
Returns
-------
dict[str, NodeAnnotatorDict | EdgeAnnotatorDict | AnnotatorValue]
Regular dict with all defaultdicts converted.
"""
d = dict(d)
for k, v in d.items():
if isinstance(v, dict | defaultdict):
d[k] = _freeze_nested_defaultdict(v)
return d
[docs]
class UDSAnnotation(ABC):
"""A Universal Decompositional Semantics annotation.
This is an abstract base class. See its RawUDSAnnotation and
NormalizedUDSAnnotation subclasses.
The ``__init__`` method for this class is abstract to ensure that
it cannot be initialized directly, even though it is used by the
subclasses and has a valid default implementation. The
``from_json`` class method is abstract to force the subclass to
define more specific constraints on its JSON inputs.
Parameters
----------
metadata
The metadata for the annotations.
data
A mapping from graph identifiers to node/edge identifiers to
property subspaces to properties to annotations. Edge
identifiers must be represented as NODEID1%%NODEID2, and node
identifiers must not contain %%.
"""
CACHE: ClassVar[dict[str, "UDSAnnotation"]] = {}
[docs]
@abstractmethod
def __init__(self, metadata: UDSAnnotationMetadata,
data: dict[str, dict[str, NormalizedData | RawData]]):
self._process_metadata(metadata)
self._process_data(data)
self._validate()
def _process_metadata(self, metadata: UDSAnnotationMetadata) -> None:
"""Store annotation metadata.
Parameters
----------
metadata : UDSAnnotationMetadata
The metadata to store
"""
self._metadata = metadata
def _process_data(self, data: dict[str, dict[str, NormalizedData | RawData]]) -> None:
"""Process annotation data into node and edge attributes.
Parameters
----------
data : dict[str, dict[str, NormalizedData | RawData]]
Raw annotation data by graph ID
"""
self._process_node_data(data)
self._process_edge_data(data)
self._graphids = set(data)
def _process_node_data(self, data: dict[str, dict[str, NormalizedData | RawData]]) -> None:
"""Extract node attributes from annotation data.
Node identifiers are those without '%%' separator.
Parameters
----------
data : dict[str, dict[str, NormalizedData | RawData]]
Raw annotation data by graph ID
"""
self._node_attributes: dict[str, dict[str, NormalizedData | RawData]] = {
gid: {node: a
for node, a in attrs.items()
if "%%" not in node}
for gid, attrs in data.items()}
# some attributes are not property subspaces and are thus excluded
self._excluded_attributes = {"subpredof", "subargof", "headof", "span", "head"}
self._node_subspaces: set[UDSSubspace] = {
cast(UDSSubspace, ss) for gid, nodedict
in self._node_attributes.items()
for nid, subspaces in nodedict.items()
for ss in subspaces
if ss not in self._excluded_attributes
}
def _process_edge_data(self, data: dict[str, dict[str, NormalizedData | RawData]]) -> None:
"""Extract edge attributes from annotation data.
Edge identifiers contain '%%' separator between source and target.
Parameters
----------
data : dict[str, dict[str, NormalizedData | RawData]]
Raw annotation data by graph ID
"""
self._edge_attributes: dict[str, dict[tuple[str, ...], NormalizedData | RawData]] = {
gid: {tuple(edge.split("%%")): a
for edge, a in attrs.items()
if "%%" in edge}
for gid, attrs in data.items()}
self._edge_subspaces: set[UDSSubspace] = {
cast(UDSSubspace, ss) for gid, edgedict
in self._edge_attributes.items()
for eid, subspaces in edgedict.items()
for ss in subspaces
}
def _validate(self) -> None:
"""Validate annotation data consistency.
Checks that:
- Node and edge annotations have the same graph IDs
- All data subspaces have associated metadata
- Warns about metadata for missing subspaces
Raises
------
ValueError
If validation fails
"""
node_graphids = set(self._node_attributes)
edge_graphids = set(self._edge_attributes)
if node_graphids != edge_graphids:
raise ValueError(
"The graph IDs that nodes are specified for "
"are not the same as those that the edges are."
"UDSAnnotation and its stock subclasses assume "
"that node and edge annotations are specified "
"for the same set of graph IDs. Unless you have "
"subclassed UDSAnnotation or its subclasses, "
"there is likely something going wrong. If "
"you have subclassed it and your subclass does "
"not require this assumption. You should override "
"UDSAnnotation._validate",
)
subspaces = self._node_subspaces | self._edge_subspaces
if self._metadata.subspaces - subspaces:
for ss in self._metadata.subspaces - subspaces:
warning(
f"The annotation metadata is specified for "
f"subspace {ss}, which is not in the data."
)
if subspaces - self._metadata.subspaces:
missing = subspaces - self._metadata.subspaces
raise ValueError(
f'The following subspaces do not have associated '
f'metadata: {",".join(missing)}'
)
[docs]
def __getitem__(
self, graphid: str,
) -> tuple[
dict[str, NormalizedData | RawData],
dict[tuple[str, ...], NormalizedData | RawData],
]:
"""Get node and edge attributes for a graph.
Parameters
----------
graphid : str
The graph identifier.
Returns
-------
tuple[dict[str, NormalizedData | RawData], dict[tuple[str, ...], NormalizedData | RawData]]
Tuple of (node_attributes, edge_attributes) for the graph.
Raises
------
KeyError
If graphid not found
"""
node_attrs = self._node_attributes[graphid]
edge_attrs = self._edge_attributes[graphid]
return node_attrs, edge_attrs
[docs]
@classmethod
@abstractmethod
def from_json(cls, jsonfile: str | TextIO) -> "UDSAnnotation":
"""Load Universal Decompositional Semantics dataset from JSON.
For node annotations, the format of the JSON passed to this
class method must be:
::
{GRAPHID_1: {NODEID_1_1: DATA,
...},
GRAPHID_2: {NODEID_2_1: DATA,
...},
...
}
Edge annotations should be of the form:
::
{GRAPHID_1: {NODEID_1_1%%NODEID_1_2: DATA,
...},
GRAPHID_2: {NODEID_2_1%%NODEID_2_2: DATA,
...},
...
}
Graph and node identifiers must match the graph and node
identifiers of the predpatt graphs to which the annotations
will be added. The subclass determines the form of DATA in the
above.
Parameters
----------
jsonfile
(path to) file containing annotations as JSON
"""
if isinstance(jsonfile, str) and jsonfile in cls.CACHE:
return cls.CACHE[jsonfile]
ext = splitext(basename(jsonfile if isinstance(jsonfile, str) else "dummy.json"))[-1]
if isinstance(jsonfile, str) and ext == ".json":
with open(jsonfile) as infile:
annotation = json.load(infile)
elif isinstance(jsonfile, str):
annotation = json.loads(jsonfile)
else:
annotation = json.load(jsonfile)
if set(annotation) < {"metadata", "data"}:
raise ValueError('annotation JSON must specify both "metadata" and "data"')
if set(annotation) > {"metadata", "data"}:
warning(
f'ignoring the following fields in annotation JSON: '
f'{", ".join(set(annotation) - {"metadata", "data"})}'
)
metadata = UDSAnnotationMetadata.from_dict(annotation["metadata"])
result = cls(metadata, annotation["data"])
if isinstance(jsonfile, str):
cls.CACHE[jsonfile] = result
return result
[docs]
def items(self, annotation_type: str | None = None) -> BaseItemsReturn:
"""Dictionary-like items generator for attributes.
If annotation_type is specified as "node" or "edge", this
generator yields a graph identifier and its node or edge
attributes (respectively); otherwise, this generator yields a
graph identifier and a tuple of its node and edge attributes.
"""
if annotation_type is None:
for gid in self.graphids:
yield gid, self[gid]
@property
def node_attributes(self) -> dict[str, dict[str, NormalizedData | RawData]]:
"""All node attributes by graph ID.
Returns
-------
dict[str, dict[str, NormalizedData | RawData]]
Mapping from graph ID to node ID to annotation data
"""
return self._node_attributes
@property
def edge_attributes(self) -> dict[str, dict[tuple[str, ...], NormalizedData | RawData]]:
"""All edge attributes by graph ID.
Returns
-------
dict[str, dict[tuple[str, ...], NormalizedData | RawData]]
Mapping from graph ID to edge tuple to annotation data
"""
return self._edge_attributes
@property
def graphids(self) -> set[str]:
"""Set of all graph identifiers with annotations.
Returns
-------
set[str]
Graph IDs that have node or edge annotations
"""
return self._graphids
@property
def node_graphids(self) -> set[str]:
"""Set of graph identifiers with node annotations.
Returns
-------
set[str]
Graph IDs that have node annotations
"""
return set(self.node_attributes)
@property
def edge_graphids(self) -> set[str]:
"""Set of graph identifiers with edge annotations.
Returns
-------
set[str]
Graph IDs that have edge annotations
"""
return set(self.edge_attributes)
@property
def metadata(self) -> UDSAnnotationMetadata:
"""The metadata for all annotations.
Returns
-------
UDSAnnotationMetadata
Metadata including subspaces, properties, and datatypes
"""
return self._metadata
@property
def node_subspaces(self) -> set[UDSSubspace]:
"""Set of subspaces used in node annotations.
Returns
-------
set[UDSSubspace]
Subspace names excluding structural attributes
"""
return self._node_subspaces
@property
def edge_subspaces(self) -> set[UDSSubspace]:
"""Set of subspaces used in edge annotations.
Returns
-------
set[UDSSubspace]
Subspace names for edges
"""
return self._edge_subspaces
@property
def subspaces(self) -> set[UDSSubspace]:
"""Set of all subspaces (node and edge).
Returns
-------
set[UDSSubspace]
Union of node and edge subspaces
"""
return self.node_subspaces | self._edge_subspaces
[docs]
def properties(self, subspace: UDSSubspace | None = None) -> set[str]:
"""Get properties for a subspace.
Parameters
----------
subspace : str | None, optional
Subspace to get properties for. If None, returns all properties.
Returns
-------
set[str]
Property names in the subspace
"""
return self._metadata.properties(subspace)
[docs]
class NormalizedUDSAnnotation(UDSAnnotation):
"""A normalized Universal Decompositional Semantics annotation.
Properties in a NormalizedUDSAnnotation may have only a single
``str``, ``int``, or ``float`` value and a single ``str``,
``int``, or ``float`` confidence.
Parameters
----------
metadata
The metadata for the annotations.
data
A mapping from graph identifiers to node/edge identifiers to
property subspaces to property to value and confidence. Edge
identifiers must be represented as NODEID1%%NODEID2, and node
identifiers must not contain %%.
"""
[docs]
@overrides
def __init__(self, metadata: UDSAnnotationMetadata,
data: dict[str, dict[str, dict[str, dict[str, PrimitiveType]]]]):
# cast to parent's expected type (NormalizedData is a subtype)
data_cast: dict[str, dict[str, NormalizedData | RawData]] = cast(
dict[str, dict[str, NormalizedData | RawData]], data,
)
super().__init__(metadata, data_cast)
def _validate(self) -> None:
"""Validate that normalized annotations don't have annotators.
Raises
------
ValueError
If metadata specifies annotators
"""
super()._validate()
if self._metadata.has_annotators():
raise ValueError(
"metadata for NormalizedUDSAnnotation should "
"not specify annotators",
)
[docs]
@classmethod
@overrides
def from_json(cls, jsonfile: str | TextIO) -> "NormalizedUDSAnnotation":
"""Load a dataset of normalized annotations from a JSON file.
For node annotations, the format of the JSON passed to this
class method must be:
::
{GRAPHID_1: {NODEID_1_1: DATA,
...},
GRAPHID_2: {NODEID_2_1: DATA,
...},
...
}
Edge annotations should be of the form:
::
{GRAPHID_1: {NODEID_1_1%%NODEID_1_2: DATA,
...},
GRAPHID_2: {NODEID_2_1%%NODEID_2_2: DATA,
...},
...
}
Graph and node identifiers must match the graph and node
identifiers of the predpatt graphs to which the annotations
will be added.
DATA in the above is assumed to have the following
structure:
::
{SUBSPACE_1: {PROP_1_1: {'value': VALUE,
'confidence': VALUE},
...},
SUBSPACE_2: {PROP_2_1: {'value': VALUE,
'confidence': VALUE},
...},
}
VALUE in the above is assumed to be unstructured.
"""
return cast("NormalizedUDSAnnotation", super().from_json(jsonfile))
[docs]
class RawUDSAnnotation(UDSAnnotation):
"""A raw Universal Decompositional Semantics dataset.
Unlike :class:`decomp.semantics.uds.NormalizedUDSAnnotation`,
objects of this class may have multiple annotations for a
particular attribute. Each annotation is associated with an
annotator ID, and different annotators may have annotated
different numbers of items.
Parameters
----------
annotation
A mapping from graph identifiers to node/edge identifiers to
property subspaces to property to value and confidence for
each annotator. Edge identifiers must be represented as
NODEID1%%NODEID2, and node identifiers must not contain %%.
"""
[docs]
@overrides
def __init__(self, metadata: UDSAnnotationMetadata,
data: dict[str, dict[str, RawData]]):
# cast to parent's expected type (RawData is a subtype)
data_cast: dict[str, dict[str, NormalizedData | RawData]] = cast(
dict[str, dict[str, NormalizedData | RawData]], data,
)
super().__init__(metadata, data_cast)
def _process_node_data(self, data: dict[str, dict[str, NormalizedData | RawData]]) -> None: # noqa: C901
# process raw node data differently than normalized
self._node_attributes = {gid: {node: a
for node, a in attrs.items()
if "%%" not in node}
for gid, attrs in data.items()}
# some attributes are not property subspaces and are thus excluded
self._excluded_attributes = {"subpredof", "subargof", "headof", "span", "head"}
self._node_subspaces: set[UDSSubspace] = {
cast(UDSSubspace, ss) for gid, nodedict
in self._node_attributes.items()
for nid, subspaces in nodedict.items()
for ss in subspaces
if ss not in self._excluded_attributes
}
# initialize as nested defaultdict, will be frozen to regular dict later
# the actual type is a nested defaultdict but we'll treat it as the final dict type
self.node_attributes_by_annotator = cast(NodeAnnotatorDict, _nested_defaultdict(5))
for gid, attrs in self._node_attributes.items():
for nid, subspaces in attrs.items():
for subspace, properties in subspaces.items():
if subspace in self._excluded_attributes:
continue
for prop, annotation in properties.items():
if prop in self._excluded_attributes:
continue
# in RawData, annotation is RawPropertyData which has
# 'value' and 'confidence' keys
if (
isinstance(annotation, dict)
and "value" in annotation
and "confidence" in annotation
):
value_dict = annotation.get("value")
conf_dict = annotation.get("confidence")
if isinstance(value_dict, dict) and isinstance(conf_dict, dict):
for annid, val in value_dict.items():
conf = conf_dict.get(annid)
if conf is not None:
# both conf and val come from dicts with
# PrimitiveType values - cast to satisfy mypy
self.node_attributes_by_annotator[
annid
][gid][nid][subspace][prop] = AnnotatorValue(
confidence=cast(PrimitiveType, conf),
value=cast(PrimitiveType, val),
)
# freeze to regular dict and cast to proper type
self.node_attributes_by_annotator = cast(
NodeAnnotatorDict,
_freeze_nested_defaultdict(self.node_attributes_by_annotator),
)
def _process_edge_data(self, data: dict[str, dict[str, NormalizedData | RawData]]) -> None:
# process raw edge data differently than normalized
self._edge_attributes = {gid: {tuple(edge.split("%%")): a
for edge, a in attrs.items()
if "%%" in edge}
for gid, attrs in data.items()}
self._edge_subspaces: set[UDSSubspace] = {
cast(UDSSubspace, ss) for gid, edgedict
in self._edge_attributes.items()
for eid, subspaces in edgedict.items()
for ss in subspaces
}
# initialize as nested defaultdict, will be frozen to regular dict later
# the actual type is a nested defaultdict but we'll treat it as the final dict type
self.edge_attributes_by_annotator = cast(EdgeAnnotatorDict, _nested_defaultdict(5))
for gid, attrs in self.edge_attributes.items():
for nid, subspaces in attrs.items():
for subspace, properties in subspaces.items():
for prop, annotation in properties.items():
# in raw data, annotation is actually a dict with
# 'value' and 'confidence' keys
if (
isinstance(annotation, dict)
and "value" in annotation
and "confidence" in annotation
):
value_dict = annotation.get("value")
conf_dict = annotation.get("confidence")
if isinstance(value_dict, dict) and isinstance(conf_dict, dict):
for annid, val in value_dict.items():
conf = conf_dict.get(annid)
if conf is not None:
# both conf and val come from dicts with
# PrimitiveType values - cast to satisfy mypy
self.edge_attributes_by_annotator[
annid
][gid][nid][subspace][prop] = AnnotatorValue(
confidence=cast(PrimitiveType, conf),
value=cast(PrimitiveType, val),
)
# freeze to regular dict and cast to proper type
self.edge_attributes_by_annotator = cast(
EdgeAnnotatorDict,
_freeze_nested_defaultdict(self.edge_attributes_by_annotator),
)
@overrides
def _validate(self) -> None:
"""Validate that raw annotations have annotators for all properties.
Raises
------
ValueError
If any property lacks annotator metadata
"""
super()._validate()
if not all(self._metadata.has_annotators(ss, p)
for ss in self._metadata.subspaces
for p in self._metadata.properties(ss)):
raise ValueError(
"metadata for RawUDSAnnotation should "
"specify annotators for all subspaces and properties",
)
[docs]
@classmethod
@overrides
def from_json(cls, jsonfile: str | TextIO) -> "RawUDSAnnotation":
"""Load a dataset for raw annotations from a JSON file.
For node annotations, the format of the JSON passed to this
class method must be:
::
{GRAPHID_1: {NODEID_1_1: DATA,
...},
GRAPHID_2: {NODEID_2_1: DATA,
...},
...
}
Edge annotations should be of the form:
::
{GRAPHID_1: {NODEID_1_1%%NODEID_1_2: DATA,
...},
GRAPHID_2: {NODEID_2_1%%NODEID_2_2: DATA,
...},
...
}
Graph and node identifiers must match the graph and node
identifiers of the predpatt graphs to which the annotations
will be added.
DATA in the above is assumed to have the following
structure:
::
{SUBSPACE_1: {PROP_1_1: {'value': {
ANNOTATOR1: VALUE1,
ANNOTATOR2: VALUE2,
...
},
'confidence': {
ANNOTATOR1: CONF1,
ANNOTATOR2: CONF2,
...
}
},
PROP_1_2: {'value': {
ANNOTATOR1: VALUE1,
ANNOTATOR2: VALUE2,
...
},
'confidence': {
ANNOTATOR1: CONF1,
ANNOTATOR2: CONF2,
...
}
},
...},
SUBSPACE_2: {PROP_2_1: {'value': {
ANNOTATOR3: VALUE1,
ANNOTATOR4: VALUE2,
...
},
'confidence': {
ANNOTATOR3: CONF1,
ANNOTATOR4: CONF2,
...
}
},
...},
...}
VALUEi and CONFi are assumed to be unstructured.
"""
return cast("RawUDSAnnotation", super().from_json(jsonfile))
[docs]
def annotators(self, subspace: UDSSubspace | None = None,
prop: str | None = None) -> set[str] | None:
"""Get annotator IDs for a subspace and property.
If neither subspace nor property are specified, all annotator
IDs are returned. If only the subspace is specified, all
annotator IDs for the subspace are returned.
Parameters
----------
subspace : str | None, optional
The subspace to filter by
prop : str | None, optional
The property to filter by
Returns
-------
set[str] | None
Set of annotator IDs or None if no annotators found
"""
return self._metadata.annotators(subspace, prop)
@overload
def items(self, annotation_type: str | None = None) -> BaseItemsReturn: ...
@overload
def items(self, annotation_type: str | None = None,
annotator_id: str | None = None) -> RawItemsReturn: ...
[docs]
def items(self, annotation_type: str | None = None, # noqa: C901
annotator_id: str | None = None) -> RawItemsReturn:
"""Dictionary-like items generator for attributes.
This method behaves exactly like UDSAnnotation.items, except
that, if an annotator ID is passed, it generates only items
annotated by the specified annotator.
Parameters
----------
annotation_type
Whether to return node annotations, edge annotations, or
both (default)
annotator_id
The annotator whose annotations will be returned by the
generator (defaults to all annotators)
Raises
------
ValueError
If both annotation_type and annotator_id are passed and
the relevant annotator gives no annotations of the
relevant type, and exception is raised
"""
if annotation_type not in [None, "node", "edge"]:
raise ValueError('annotation_type must be None, "node", or "edge"')
if annotator_id is None:
# call parent class method when no annotator_id specified
yield from super().items(annotation_type)
elif annotation_type == "node":
if annotator_id in self.node_attributes_by_annotator:
for gid in self.graphids:
node_attrs = self.node_attributes_by_annotator[annotator_id][gid]
# when annotation_type is "node", yield only node_attrs (not a tuple)
yield gid, node_attrs
else:
raise ValueError(f"{annotator_id} does not have associated node annotations")
elif annotation_type == "edge":
if annotator_id in self.edge_attributes_by_annotator:
for gid in self.graphids:
edge_attrs = self.edge_attributes_by_annotator[annotator_id][gid]
# when annotation_type is "edge", yield only edge_attrs (not a tuple)
yield gid, edge_attrs
else:
raise ValueError(
f"{annotator_id} does not have associated "
"edge annotations",
)
else:
for gid in self.graphids:
if annotator_id in self.node_attributes_by_annotator:
node_attrs = self.node_attributes_by_annotator[annotator_id][gid]
else:
node_attrs = {}
if annotator_id in self.edge_attributes_by_annotator:
edge_attrs = self.edge_attributes_by_annotator[annotator_id][gid]
else:
edge_attrs = {}
yield gid, (cast(dict[str, NormalizedData | RawData], node_attrs),
cast(dict[tuple[str, ...], NormalizedData | RawData], edge_attrs))