"""Graph representations for Universal Decompositional Semantics (UDS) annotations.
This module provides the core graph infrastructure for representing UDS annotations
at both sentence and document levels using NetworkX directed graphs. The graphs
capture syntactic structure, semantic predicates and arguments, and the interfaces
between them.
Key Components
--------------
Type Aliases
- :data:`NodeID`: String identifiers for graph nodes
- :data:`EdgeKey`: Tuples identifying edges between nodes
- :data:`NodeAttributes`, :data:`EdgeAttributes`: Dictionaries storing node/edge properties
- :data:`DomainType`: The domain a node belongs to ('syntax', 'semantics', 'document')
- :data:`NodeType`: The type of node ('token', 'predicate', 'argument', 'root')
- :data:`EdgeType`: The type of edge ('head', 'dependency', 'interface')
Classes
- :class:`UDSGraph`: Abstract base class providing core graph functionality
- :class:`UDSSentenceGraph`: Sentence-level graphs with syntax and semantics layers
- :class:`UDSDocumentGraph`: Document-level graphs connecting multiple sentences
The graphs use a consistent naming scheme where node IDs incorporate the graph name
and domain (e.g., 'ewt-001-1-syntax-1' for a syntax token). Edge attributes specify
the domain and type of relationship between nodes.
Features include SPARQL querying via RDF conversion, graph operations for finding
maximal/minimal nodes, extracting subgraphs by domain, and adding UDS annotations
to existing graph structures. The sentence graphs automatically add performative
nodes representing the speaker/addressee for discourse representation.
See Also
--------
decomp.semantics.uds.annotation : UDS annotation classes
decomp.semantics.uds.corpus : Corpus-level UDS graph collections
decomp.graph.nx : NetworkX graph utilities
"""
from abc import ABC, abstractmethod
from functools import cached_property, lru_cache
from logging import info, warning
from typing import TYPE_CHECKING, ClassVar, Literal
from networkx import DiGraph, adjacency_data, adjacency_graph
from overrides import overrides
from pyparsing import ParseException
from rdflib import Graph
from rdflib.plugins.sparql import prepareQuery
from rdflib.plugins.sparql.sparql import Query
from rdflib.query import Result
# import RDFConverter (need to check if it exists first)
if TYPE_CHECKING:
from ...graph import RDFConverter as _RDFConverter
RDFConverter: type[_RDFConverter] | None = _RDFConverter
else:
try:
from ...graph import RDFConverter
except ImportError:
RDFConverter = None
# type aliases
type NodeID = str
"""Unique identifier for a node in the graph."""
type EdgeKey = tuple[NodeID, NodeID]
"""Edge identifier as (source_node, target_node) tuple."""
# domain and type literals
type DomainType = Literal['syntax', 'semantics', 'document']
"""The domain a node or edge belongs to."""
type NodeType = Literal['token', 'predicate', 'argument', 'root']
"""The type of a node within its domain."""
type EdgeType = Literal['head', 'nonhead', 'dependency', 'interface']
"""The type of relationship an edge represents."""
# node attributes can vary based on domain
# common attributes: domain, type, position, form, frompredpatt, semantics
# also includes UDS annotation subspaces and properties
type NodeAttributes = dict[
str,
str | int | bool | dict[str, str] |
dict[str, dict[str, dict[str, str | int | bool | float]]] |
dict[str, dict[str, dict[str, dict[str, str | int | bool | float]]]]
]
"""Dictionary of node attributes including domain, type, and annotation data."""
type EdgeAttributes = dict[
str,
str | int | bool | dict[str, str] |
dict[str, dict[str, dict[str, str | int | bool | float]]] |
dict[str, dict[str, dict[str, dict[str, str | int | bool | float]]]]
]
"""Dictionary of edge attributes including domain, type, and annotation data."""
# Attribute values can be various types
type AttributeValue = str | int | bool | float | dict[str, str]
"""Union of possible attribute value types."""
type QueryResult = dict[str, NodeAttributes] | dict[EdgeKey, EdgeAttributes]
"""Result type for graph queries, either nodes or edges."""
[docs]
class UDSGraph(ABC):
"""Abstract base class for sentence- and document-level graphs.
Parameters
----------
graph
a NetworkX DiGraph
name
a unique identifier for the graph
"""
[docs]
@abstractmethod
def __init__(self, graph: DiGraph, name: str):
self.name = name
self.graph = graph
@property
def nodes(self) -> dict[NodeID, NodeAttributes]:
"""All nodes in the graph with their attributes.
Returns
-------
dict[NodeID, NodeAttributes]
Mapping from node IDs to their attributes
"""
return dict(self.graph.nodes)
@property
def edges(self) -> dict[EdgeKey, EdgeAttributes]:
"""All edges in the graph with their attributes.
Returns
-------
dict[EdgeKey, EdgeAttributes]
Mapping from edge tuples to their attributes
"""
return dict(self.graph.edges)
[docs]
def to_dict(self) -> dict[str, dict[str, dict[str, str | int | bool | dict[str, str]]]]:
"""Convert the graph to adjacency dictionary format.
Returns
-------
dict[str, dict[str, dict[str, str | int | bool | dict[str, str]]]]
NetworkX adjacency data format
"""
return dict(adjacency_data(self.graph))
[docs]
@classmethod
def from_dict(
cls,
graph: dict[str, dict[str, dict[str, str | int | bool | dict[str, str]]]],
name: str = 'UDS'
) -> 'UDSGraph':
"""Construct a UDSGraph from a dictionary.
Parameters
----------
graph
a dictionary constructed by networkx.adjacency_data
name
identifier to append to the beginning of node ids
"""
return cls(adjacency_graph(graph), name)
[docs]
class UDSSentenceGraph(UDSGraph):
"""A Universal Decompositional Semantics sentence-level graph.
Parameters
----------
graph
the NetworkX DiGraph from which the sentence-level graph
is to be constructed
name
the name of the graph
sentence_id
the UD identifier for the sentence associated with this graph
document_id
the UD identifier for the document associated with this graph
"""
QUERIES: ClassVar[dict[str, Query]] = {}
[docs]
@overrides
def __init__(
self,
graph: DiGraph,
name: str,
sentence_id: str | None = None,
document_id: str | None = None
):
super().__init__(graph, name)
self.sentence_id = sentence_id
self.document_id = document_id
self._rdf: Graph | None = None
self._add_performative_nodes()
@property
def rdf(self) -> Graph:
"""The graph converted to RDF format.
Returns
-------
Graph
RDFLib graph representation
Raises
------
AttributeError
If RDFConverter is not available
"""
if self._rdf is None:
if RDFConverter is None:
raise AttributeError("RDFConverter not available")
# Type narrowing: RDFConverter is not None at this point
converter: type[_RDFConverter] = RDFConverter
self._rdf = converter.networkx_to_rdf(self.graph)
return self._rdf
@cached_property
def rootid(self) -> NodeID:
"""The ID of the graph's root node.
Returns
-------
NodeID
The root node identifier
Raises
------
ValueError
If the graph has no root or multiple roots
"""
candidates: list[NodeID] = [
nid for nid, attrs
in self.graph.nodes.items()
if attrs['type'] == 'root'
]
if len(candidates) > 1:
errmsg = f'{self.name} has more than one root'
raise ValueError(errmsg)
if len(candidates) == 0:
errmsg = f'{self.name} has no root'
raise ValueError(errmsg)
return candidates[0]
def _add_performative_nodes(self) -> None:
"""Add performative nodes (author, addressee, root predicate) to the graph.
Creates special nodes that represent the speech act structure:
- semantics-pred-root: The root predicate node
- semantics-arg-0: Argument representing the utterance
- semantics-arg-author: The speaker/writer
- semantics-arg-addressee: The listener/reader
"""
max_preds = self.maxima([
nid for nid, attrs
in self.semantics_nodes.items()
if attrs['frompredpatt']
])
# new nodes
self.graph.add_node(self.graph.name+'-semantics-pred-root',
domain='semantics', type='predicate',
frompredpatt=False)
self.graph.add_node(self.graph.name+'-semantics-arg-0',
domain='semantics', type='argument',
frompredpatt=False)
self.graph.add_node(self.graph.name+'-semantics-arg-author',
domain='semantics', type='argument',
frompredpatt=False)
self.graph.add_node(self.graph.name+'-semantics-arg-addressee',
domain='semantics', type='argument',
frompredpatt=False)
# new semantics edges
for predid in max_preds:
if predid != self.graph.name+'-semantics-pred-root':
self.graph.add_edge(self.graph.name+'-semantics-arg-0',
predid,
domain='semantics', type='head',
frompredpatt=False)
self.graph.add_edge(self.graph.name+'-semantics-pred-root',
self.graph.name+'-semantics-arg-0',
domain='semantics', type='dependency',
frompredpatt=False)
self.graph.add_edge(self.graph.name+'-semantics-pred-root',
self.graph.name+'-semantics-arg-author',
domain='semantics', type='dependency',
frompredpatt=False)
self.graph.add_edge(self.graph.name+'-semantics-pred-root',
self.graph.name+'-semantics-arg-addressee',
domain='semantics', type='dependency',
frompredpatt=False)
# new instance edge
self.graph.add_edge(self.graph.name+'-semantics-arg-0',
self.graph.name+'-root-0',
domain='interface', type='dependency',
frompredpatt=False)
[docs]
@lru_cache(maxsize=128) # noqa: B019
def query(
self,
query: str | Query,
query_type: str | None = None,
cache_query: bool = True,
cache_rdf: bool = True
) -> Result | dict[str, NodeAttributes] | dict[EdgeKey, EdgeAttributes]:
"""Query graph using SPARQL 1.1.
Parameters
----------
query
a SPARQL 1.1 query
query_type
whether this is a 'node' query or 'edge' query. If set to
None (default), a Results object will be returned. The
main reason to use this option is to automatically format
the output of a custom query, since Results objects
require additional postprocessing.
cache_query
whether to cache the query; false when querying
particular nodes or edges using precompiled queries
clear_rdf
whether to delete the RDF constructed for querying
against. This will slow down future queries but saves a
lot of memory
"""
results: Result | dict[str, NodeAttributes] | dict[EdgeKey, EdgeAttributes]
try:
if isinstance(query, str) and cache_query:
if query not in self.__class__.QUERIES:
self.__class__.QUERIES[query] = prepareQuery(query)
query = self.__class__.QUERIES[query]
if query_type == 'node':
results = self._node_query(query, cache_query=cache_query)
elif query_type == 'edge':
results = self._edge_query(query, cache_query=cache_query)
else:
results = self.rdf.query(query)
except ParseException:
errmsg = 'invalid SPARQL 1.1 query'
raise ValueError(errmsg) from None
if not cache_rdf and hasattr(self, '_rdf'):
delattr(self, '_rdf')
return results
def _node_query(
self,
query: str | Query,
cache_query: bool
) -> dict[str, NodeAttributes]:
"""Execute a SPARQL query that returns nodes.
Parameters
----------
query : str | Query
SPARQL query expected to return node IDs
cache_query : bool
Whether to cache the compiled query
Returns
-------
dict[str, NodeAttributes]
Mapping from node IDs to their attributes
Raises
------
ValueError
If query returns non-node results
"""
results: list[str] = [r[0].toPython() # type: ignore[index,union-attr]
for r in self.query(query,
cache_query=cache_query)]
try:
return {nodeid: self.graph.nodes[nodeid] for nodeid in results}
except KeyError:
raise ValueError(
'invalid node query: your query must be guaranteed '
'to capture only nodes, but it appears to also '
'capture edges and/or properties'
) from None
def _edge_query(
self,
query: str | Query,
cache_query: bool
) -> dict[EdgeKey, EdgeAttributes]:
"""Execute a SPARQL query that returns edges.
Parameters
----------
query : str | Query
SPARQL query expected to return edge IDs (format: "node1%%node2")
cache_query : bool
Whether to cache the compiled query
Returns
-------
dict[EdgeKey, EdgeAttributes]
Mapping from edge tuples to their attributes
Raises
------
ValueError
If query returns non-edge results
"""
results: list[tuple[str, str]] = [
tuple(edge[0].toPython().split('%%')) # type: ignore[index,union-attr]
for edge in self.query(query, cache_query=cache_query)
]
try:
return {edge: self.graph.edges[edge]
for edge in results}
except KeyError:
raise ValueError(
'invalid edge query: your query must be guaranteed '
'to capture only edges, but it appears to also '
'capture nodes and/or properties'
) from None
@property
def syntax_nodes(self) -> dict[str, NodeAttributes]:
"""All syntax domain token nodes.
Returns
-------
dict[str, NodeAttributes]
Mapping of node IDs to attributes for syntax tokens
"""
return {
nid: attrs for nid, attrs in self.graph.nodes.items()
if attrs['domain'] == 'syntax'
if attrs['type'] == 'token'
}
@property
def semantics_nodes(self) -> dict[str, NodeAttributes]:
"""All semantics domain nodes.
Returns
-------
dict[str, NodeAttributes]
Mapping of node IDs to attributes for semantics nodes
"""
return {
nid: attrs for nid, attrs
in self.graph.nodes.items()
if attrs['domain'] == 'semantics'
}
@property
def predicate_nodes(self) -> dict[str, NodeAttributes]:
"""All predicate nodes in the semantics domain.
Returns
-------
dict[str, NodeAttributes]
Mapping of node IDs to attributes for predicates
"""
return {
nid: attrs for nid, attrs
in self.graph.nodes.items()
if attrs['domain'] == 'semantics'
if attrs['type'] == 'predicate'
}
@property
def argument_nodes(self) -> dict[str, NodeAttributes]:
"""All argument nodes in the semantics domain.
Returns
-------
dict[str, NodeAttributes]
Mapping of node IDs to attributes for arguments
"""
return {
nid: attrs for nid, attrs
in self.graph.nodes.items()
if attrs['domain'] == 'semantics'
if attrs['type'] == 'argument'
}
@property
def syntax_subgraph(self) -> DiGraph:
"""Subgraph containing only syntax nodes.
Returns
-------
DiGraph
NetworkX subgraph with syntax nodes
"""
return self.graph.subgraph(list(self.syntax_nodes))
@property
def semantics_subgraph(self) -> DiGraph:
"""Subgraph containing only semantics nodes.
Returns
-------
DiGraph
NetworkX subgraph with semantics nodes
"""
return self.graph.subgraph(list(self.semantics_nodes))
[docs]
@lru_cache(maxsize=128) # noqa: B019
def semantics_edges(
self,
nodeid: str | None = None,
edgetype: str | None = None
) -> dict[EdgeKey, EdgeAttributes]:
"""Return edges between semantics nodes.
Parameters
----------
nodeid
The node that must be incident on an edge
edgetype
The type of edge ("dependency" or "head")
"""
if nodeid is None:
candidates = {
eid: attrs for eid, attrs
in self.graph.edges.items()
if attrs['domain'] == 'semantics'
}
else:
candidates = {
eid: attrs for eid, attrs
in self.graph.edges.items()
if attrs['domain'] == 'semantics'
if nodeid in eid
}
if edgetype is None:
return candidates
else:
return {
eid: attrs for eid, attrs in candidates.items()
if attrs['type'] == edgetype
}
[docs]
@lru_cache(maxsize=128) # noqa: B019
def argument_edges(
self,
nodeid: str | None = None
) -> dict[EdgeKey, EdgeAttributes]:
"""Return edges between predicates and their arguments.
Parameters
----------
nodeid
The node that must be incident on an edge
"""
return self.semantics_edges(nodeid, edgetype='dependency')
[docs]
@lru_cache(maxsize=128) # noqa: B019
def argument_head_edges(
self,
nodeid: str | None = None
) -> dict[EdgeKey, EdgeAttributes]:
"""Return edges between nodes and their semantic heads.
Parameters
----------
nodeid
The node that must be incident on an edge
"""
return self.semantics_edges(nodeid, edgetype='head')
[docs]
@lru_cache(maxsize=128) # noqa: B019
def syntax_edges(
self,
nodeid: str | None = None
) -> dict[EdgeKey, EdgeAttributes]:
"""Return edges between syntax nodes.
Parameters
----------
nodeid
The node that must be incident on an edge
"""
if nodeid is None:
return {
eid: attrs for eid, attrs in self.graph.edges.items()
if attrs['domain'] == 'syntax'
}
else:
return {
eid: attrs for eid, attrs
in self.graph.edges.items()
if attrs['domain'] == 'syntax'
if nodeid in eid
}
[docs]
@lru_cache(maxsize=128) # noqa: B019
def instance_edges(
self,
nodeid: str | None = None
) -> dict[EdgeKey, EdgeAttributes]:
"""Return edges between syntax nodes and semantics nodes.
Parameters
----------
nodeid
The node that must be incident on an edge
"""
if nodeid is None:
return {
eid: attrs for eid, attrs
in self.graph.edges.items()
if attrs['domain'] == 'interface'
}
else:
return {
eid: attrs for eid, attrs
in self.graph.edges.items()
if attrs['domain'] == 'interface'
if nodeid in eid
}
[docs]
def span(
self,
nodeid: str,
attrs: list[str] | None = None
) -> dict[int, list[AttributeValue]]:
"""Get the span corresponding to a semantics node.
Parameters
----------
nodeid
the node identifier for a semantics node
attrs
a list of syntax node attributes to return
Returns
-------
a mapping from positions in the span to the requested
attributes in those positions
"""
if attrs is None:
attrs = ['form']
if self.graph.nodes[nodeid]['domain'] != 'semantics':
raise ValueError('Only semantics nodes have (nontrivial) spans')
is_performative = 'pred-root' in nodeid or\
'arg-author' in nodeid or\
'arg-addressee' in nodeid or\
'arg-0' in nodeid
if is_performative:
raise ValueError('Performative nodes do not have spans')
return {self.graph.nodes[e[1]]['position']: [self.graph.nodes[e[1]][a]
for a in attrs]
for e in self.instance_edges(nodeid)}
[docs]
def head(
self,
nodeid: str,
attrs: list[str] | None = None
) -> tuple[int, list[AttributeValue]]:
"""Get the head corresponding to a semantics node.
Parameters
----------
nodeid
the node identifier for a semantics node
attrs
a list of syntax node attributes to return
Returns
-------
a pairing of the head position and the requested
attributes
"""
if attrs is None:
attrs = ['form']
if self.graph.nodes[nodeid]['domain'] != 'semantics':
raise ValueError('Only semantics nodes have heads')
is_performative = 'pred-root' in nodeid or\
'arg-author' in nodeid or\
'arg-addressee' in nodeid or\
'arg-0' in nodeid
if is_performative:
raise ValueError('Performative nodes do not have heads')
return next(
(self.graph.nodes[e[1]]['position'],
[self.graph.nodes[e[1]][a] for a in attrs])
for e, attr in self.instance_edges(nodeid).items()
if attr['type'] == 'head'
)
[docs]
def maxima(self, nodeids: list[str] | None = None) -> list[str]:
"""Find nodes not dominated by any other nodes in the set.
Parameters
----------
nodeids : list[str] | None, optional
Nodes to consider. If None, uses all nodes.
Returns
-------
list[str]
Node IDs that have no incoming edges from other nodes in the set
"""
if nodeids is None:
nodeids = list(self.graph.nodes)
return [nid for nid in nodeids
if all(e[0] == nid
for e in self.graph.edges
if e[0] in nodeids
if e[1] in nodeids
if nid in e)]
[docs]
def minima(self, nodeids: list[str] | None = None) -> list[str]:
"""Find nodes not dominating any other nodes in the set.
Parameters
----------
nodeids : list[str] | None, optional
Nodes to consider. If None, uses all nodes.
Returns
-------
list[str]
Node IDs that have no outgoing edges to other nodes in the set
"""
if nodeids is None:
nodeids = list(self.graph.nodes)
return [nid for nid in nodeids
if all(e[0] != nid
for e in self.graph.edges
if e[0] in nodeids
if e[1] in nodeids
if nid in e)]
[docs]
def add_annotation(
self,
node_attrs: dict[str, NodeAttributes],
edge_attrs: dict[EdgeKey, EdgeAttributes],
add_heads: bool = True,
add_subargs: bool = False,
add_subpreds: bool = False,
add_orphans: bool = False
) -> None:
"""Add node and or edge annotations to the graph.
Parameters
----------
node_attrs
edge_attrs
add_heads
add_subargs
add_subpreds
add_orphans
"""
for node, attrs in node_attrs.items():
self._add_node_annotation(node, attrs,
add_heads, add_subargs,
add_subpreds, add_orphans)
for edge, attrs in edge_attrs.items():
self._add_edge_annotation(edge, attrs)
def _add_node_annotation(
self,
node: NodeID,
attrs: NodeAttributes,
add_heads: bool,
add_subargs: bool,
add_subpreds: bool,
add_orphans: bool
) -> None:
"""Add annotation to a node, potentially creating new nodes.
Parameters
----------
node : NodeID
Node identifier
attrs : NodeAttributes
Attributes to add
add_heads : bool
Whether to add head nodes
add_subargs : bool
Whether to add subargument nodes
add_subpreds : bool
Whether to add subpredicate nodes
add_orphans : bool
Whether to add orphan nodes
"""
if node in self.graph.nodes:
self.graph.nodes[node].update(attrs)
elif 'headof' in attrs and attrs['headof'] in self.graph.nodes:
edge = (attrs['headof'], node)
if not add_heads:
info(
f'head edge {edge} in {self.name} '
'found in annotations but not added'
)
else:
info(f'adding head edge {edge} to {self.name}')
attrs = dict(attrs,
**{'domain': 'semantics',
'type': 'argument',
'frompredpatt': False})
self.graph.add_node(node,
**{k: v
for k, v in attrs.items()
if k not in ['headof',
'head',
'span']})
self.graph.add_edge(*edge, domain='semantics', type='head')
instedge = (node, attrs['head'])
self.graph.add_edge(*instedge, domain='interface', type='head')
# for nonhead in attrs['span']:
# if nonhead != attrs['head']:
# instedge = (node, nonhead)
# self.graph.add_edge(*instedge, domain='interface', type='head')
elif 'subargof' in attrs and attrs['subargof'] in self.graph.nodes:
edge = (attrs['subargof'], node)
if not add_subargs:
info(
f'subarg edge {edge} in {self.name} '
'found in annotations but not added'
)
else:
info(f'adding subarg edge {edge} to {self.name}')
attrs = dict(attrs,
**{'domain': 'semantics',
'type': 'argument',
'frompredpatt': False})
self.graph.add_node(node,
**{k: v
for k, v in attrs.items()
if k != 'subargof'})
self.graph.add_edge(*edge,
domain='semantics',
type='subargument')
instedge = (node, node.replace('semantics-subarg', 'syntax'))
self.graph.add_edge(*instedge, domain='interface', type='head')
elif 'subpredof' in attrs and attrs['subpredof'] in self.graph.nodes:
edge = (attrs['subpredof'], node)
if not add_subpreds:
info(
f'subpred edge {edge} in {self.name} '
'found in annotations but not added'
)
else:
info(
f'adding subpred edge {edge} to {self.name}'
)
attrs = dict(attrs,
**{'domain': 'semantics',
'type': 'predicate',
'frompredpatt': False})
self.graph.add_node(node,
**{k: v
for k, v in attrs.items()
if k != 'subpredof'})
self.graph.add_edge(*edge,
domain='semantics',
type='subpredicate')
instedge = (node, node.replace('semantics-subpred', 'syntax'))
self.graph.add_edge(*instedge, domain='interface', type='head')
elif not add_orphans:
info(
f'orphan node {node} in {self.name} '
'found in annotations but not added'
)
else:
warning(f'adding orphan node {node} in {self.name}')
attrs = dict(attrs,
**{'domain': 'semantics',
'type': 'predicate',
'frompredpatt': False})
self.graph.add_node(node,
**{k: v
for k, v in attrs.items()
if k != 'subpredof'})
synnode = node.replace('semantics-pred', 'syntax')
synnode = synnode.replace('semantics-arg', 'syntax')
synnode = synnode.replace('semantics-subpred', 'syntax')
synnode = synnode.replace('semantics-subarg', 'syntax')
instedge = (node, synnode)
self.graph.add_edge(*instedge, domain='interface', type='head')
if self.rootid is not None:
self.graph.add_edge(self.rootid, node)
def _add_edge_annotation(self, edge: EdgeKey, attrs: EdgeAttributes) -> None:
"""Add annotation to an edge.
Parameters
----------
edge : EdgeKey
Edge tuple (source, target)
attrs : EdgeAttributes
Attributes to add
"""
if edge in self.graph.edges:
self.graph.edges[edge].update(attrs)
else:
warning(f'adding unlabeled edge {edge} to {self.name}')
self.graph.add_edge(*edge, **attrs)
@cached_property
def sentence(self) -> str:
"""The sentence text reconstructed from syntax nodes.
Returns
-------
str
The sentence text with tokens in surface order
"""
id_word = {}
for _, nodeattr in self.syntax_nodes.items():
pos = nodeattr.get('position')
form = nodeattr.get('form')
if isinstance(pos, int) and isinstance(form, str):
id_word[pos - 1] = form
return ' '.join([
id_word[i] for i in range(max(list(id_word.keys()))+1)
])
[docs]
class UDSDocumentGraph(UDSGraph):
"""A Universal Decompositional Semantics document-level graph.
Parameters
----------
graph
the NetworkX DiGraph from which the document-level graph
is to be constructed
name
the name of the graph
"""
[docs]
@overrides
def __init__(self, graph: DiGraph, name: str):
super().__init__(graph, name)
[docs]
def add_annotation(
self,
node_attrs: dict[str, NodeAttributes],
edge_attrs: dict[EdgeKey, EdgeAttributes],
sentence_ids: dict[str, str]
) -> None:
"""Add node and or edge annotations to the graph.
Parameters
----------
node_attrs
the node annotations to be added
edge_attrs
the edge annotations to be added
sentence_ids
the IDs of all sentences in the document
"""
for node, attrs in node_attrs.items():
self._add_node_annotation(node, attrs)
for edge, attrs in edge_attrs.items():
self._add_edge_annotation(edge, attrs, sentence_ids)
def _add_edge_annotation(
self,
edge: EdgeKey,
attrs: EdgeAttributes,
sentence_ids: dict[str, str]
) -> None:
"""Add annotation to a document-level edge.
Parameters
----------
edge : EdgeKey
Edge tuple (source, target)
attrs : EdgeAttributes
Attributes to add
sentence_ids : dict[str, str]
Mapping of graph names to sentence IDs
"""
if edge in self.graph.edges:
self.graph.edges[edge].update(attrs)
else:
# Verify that the annotation is intra-document
s1 = '-'.join(edge[0].split('-')[:3])
s2 = '-'.join(edge[1].split('-')[:3])
if s1 not in sentence_ids or s2 not in sentence_ids:
warning(
f'Skipping cross-document annotation from {edge[0]} '
f'to {edge[1]}'
)
return
attrs = dict(
attrs,
**{'domain': 'document',
'type': 'relation',
'frompredpatt': False,
'id': edge[1]}
)
self.graph.add_edge(*edge, **attrs)
def _add_node_annotation(self, node: NodeID, attrs: NodeAttributes) -> None:
"""Add annotation to a document-level node.
Note: Document-level node annotations are uncommon; most document
annotations are edge-based.
Parameters
----------
node : NodeID
Node identifier
attrs : NodeAttributes
Attributes to add
"""
# we do not currently have a use case for document node annotations,
# but it is included for completeness.
if node in self.graph.nodes:
warning(
f'Attempting to add a node annotation to node {node} '
f'in document graph {self.name}. Document-level '
'annotations should likely be edge attributes.'
)
self.graph.nodes[node].update(attrs)
else:
warning(
f'Attempting to add annotation to unknown node {node} '
f'in document graph {self.name}'
)