Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions dgf/src/analyse/topology/BUILD
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
load("@rules_python//python:py_library.bzl", "py_library")
load("@rules_python//python:py_test.bzl", "py_test")

package(

Expand All @@ -16,13 +17,36 @@ py_library(
],
)

py_library(
name = "betti_defense",
srcs = ["betti_defense.py"],
deps = [
"//dgf/src/data:in_memory_graph",
"//dgf/src/data:schema",
# numpy dep,
],
)

py_library(
name = "global_graph_topology",
srcs = ["global_graph_topology.py"],
deps = [
":betti_defense",
":node_degree",
"//dgf/src/data:in_memory_graph",
"//dgf/src/data:schema",
# numpy dep,
],
)

py_test(
name = "betti_defense_test",
srcs = ["betti_defense_test.py"],
deps = [
":betti_defense",
# absl/testing:absltest dep,
"//dgf/src/data:in_memory_graph",
"//dgf/src/data:schema",
# numpy dep,
],
)
159 changes: 159 additions & 0 deletions dgf/src/analyse/topology/betti_defense.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright 2024 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Topological anomaly detection using Betti numbers for InMemoryGraph.

This module provides a lightweight defense layer for detecting structural
anomalies in graph inputs by analysing their first Betti number (beta_1),
which counts independent cycles. It is intended to be used as a
pre-processing filter or safety guard in graph neural network pipelines.
"""

from typing import Optional

import numpy as np

from dgf.src.data import in_memory_graph
from dgf.src.data import schema as schema_lib

GraphSchema = schema_lib.GraphSchema
InMemoryGraph = in_memory_graph.InMemoryGraph
InMemoryEdgeSet = in_memory_graph.InMemoryEdgeSet


def connected_components(
adjacency: np.ndarray,
num_nodes: int,
) -> np.ndarray:
"""Returns connected-component labels for each node.

Uses a simple union-find on the undirected view of the edge list.

Args:
adjacency: Integer array of shape [2, num_edges].
num_nodes: Total number of nodes in the graph.

Returns:
Integer array of shape [num_nodes] where equal values denote nodes
that belong to the same connected component.
"""
parent = np.arange(num_nodes, dtype=np.int32)

def _find(x: np.ndarray) -> np.ndarray:
"""Vectorised find with path compression."""
# Iterative path compression – at most log* N iterations.
while True:
p = parent[x]
unchanged = p == x
x = np.where(unchanged, x, p)
if np.all(unchanged):
break
return x

# Union step for every undirected edge.
if adjacency.shape[1] > 0:
src = adjacency[0]
tgt = adjacency[1]
root_src = _find(src)
root_tgt = _find(tgt)
smaller = np.minimum(root_src, root_tgt)
larger = np.maximum(root_src, root_tgt)
parent[larger] = smaller
# Second pass to compress any remaining paths.
parent[:] = _find(np.arange(num_nodes))

return parent


def _num_connected_components(
adjacency: np.ndarray,
num_nodes: int,
) -> int:
"""Returns the number of connected components."""
labels = connected_components(adjacency, num_nodes)
return int(np.unique(labels).shape[0])


def calculate_betti_1(
graph: InMemoryGraph,
schema: GraphSchema,
) -> int:
"""Calculates the first Betti number (beta_1) of a homogeneous graph.

beta_1 = |E| - |V| + C

where |E| is the number of edges, |V| the number of vertices, and C the
number of connected components.

Args:
graph: An InMemoryGraph. Must be homogeneous (single node set, single
edge set, source == target).
schema: The GraphSchema for the graph.

Returns:
The integer beta_1 value.

Raises:
ValueError: If the graph is not homogeneous.
"""
if not _is_homogeneous_graph(graph, schema):
raise ValueError(
"calculate_betti_1 currently only supports homogeneous graphs.")

node_set = next(iter(graph.node_sets.values()))
edge_set = next(iter(graph.edge_sets.values()))
num_nodes = node_set.num_nodes or 0
num_edges = edge_set.num_edges()

if num_nodes == 0:
return 0

c = _num_connected_components(edge_set.adjacency, num_nodes)
return num_edges - num_nodes + c


def is_anomalous(
graph: InMemoryGraph,
schema: GraphSchema,
expected_max_betti: int = 1,
) -> bool:
"""Flags a graph whose first Betti number exceeds a threshold.

Args:
graph: An InMemoryGraph. Must be homogeneous.
schema: The GraphSchema for the graph.
expected_max_betti: Maximum expected beta_1 for a benign graph.

Returns:
True if the graph is topologically anomalous.
"""
betti_1 = calculate_betti_1(graph, schema)
return betti_1 > expected_max_betti


def _is_homogeneous_graph(
graph: InMemoryGraph,
schema: GraphSchema,
) -> bool:
"""Returns True if the graph is homogeneous."""
if (
len(graph.node_sets) == 1
and len(graph.edge_sets) == 1
and len(schema.node_sets) == 1
and len(schema.edge_sets) == 1
):
edge_set = next(iter(schema.edge_sets.values()))
node_set_name = next(iter(schema.node_sets.keys()))
if node_set_name == edge_set.source and node_set_name == edge_set.target:
return True
return False
139 changes: 139 additions & 0 deletions dgf/src/analyse/topology/betti_defense_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright 2024 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for betti_defense."""

from absl.testing import absltest
import numpy as np
from dgf.src.analyse.topology import betti_defense
from dgf.src.data import in_memory_graph
from dgf.src.data import schema as schema_lib


class BettiDefenseTest(absltest.TestCase):

def _make_homogeneous_graph(
self,
num_nodes: int,
edges: list[tuple[int, int]],
) -> tuple[in_memory_graph.InMemoryGraph, schema_lib.GraphSchema]:
adjacency = np.array(edges, dtype=np.int32).T if edges else np.zeros(
(2, 0), dtype=np.int32)
graph = in_memory_graph.InMemoryGraph(
node_sets={
"nodes": in_memory_graph.InMemoryNodeSet(
num_nodes=num_nodes,
features={},
)
},
edge_sets={
"edges": in_memory_graph.InMemoryEdgeSet(
adjacency=adjacency,
features={},
)
},
)
schema = schema_lib.GraphSchema(
node_sets={
"nodes": schema_lib.NodeSchema(features={}),
},
edge_sets={
"edges": schema_lib.EdgeSchema(
source="nodes",
target="nodes",
features={},
),
},
)
return graph, schema

def test_empty_graph(self):
graph, schema = self._make_homogeneous_graph(0, [])
self.assertEqual(betti_defense.calculate_betti_1(graph, schema), 0)

def test_single_node(self):
graph, schema = self._make_homogeneous_graph(1, [])
self.assertEqual(betti_defense.calculate_betti_1(graph, schema), 0)

def test_tree_path_4(self):
# 0-1-2-3 : 4 nodes, 3 edges, 1 component -> beta_1 = 0
graph, schema = self._make_homogeneous_graph(4, [(0, 1), (1, 2), (2, 3)])
self.assertEqual(betti_defense.calculate_betti_1(graph, schema), 0)

def test_cycle_5(self):
# 0-1-2-3-4-0 : 5 nodes, 5 edges, 1 component -> beta_1 = 1
graph, schema = self._make_homogeneous_graph(
5, [(0, 1), (1, 2), (2, 3), (3, 4), (4, 0)])
self.assertEqual(betti_defense.calculate_betti_1(graph, schema), 1)

def test_two_triangles(self):
# Two disconnected triangles: 6 nodes, 6 edges, 2 components -> beta_1 = 2
graph, schema = self._make_homogeneous_graph(
6,
[(0, 1), (1, 2), (2, 0), (3, 4), (4, 5), (5, 3)],
)
self.assertEqual(betti_defense.calculate_betti_1(graph, schema), 2)

def test_complete_k4(self):
# K4: 4 nodes, 6 edges, 1 component -> beta_1 = 3
graph, schema = self._make_homogeneous_graph(
4, [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)])
self.assertEqual(betti_defense.calculate_betti_1(graph, schema), 3)

def test_no_edges(self):
graph, schema = self._make_homogeneous_graph(3, [])
self.assertEqual(betti_defense.calculate_betti_1(graph, schema), 0)

def test_is_anomalous(self):
clean, schema = self._make_homogeneous_graph(4, [(0, 1), (1, 2), (2, 3)])
anomalous, _ = self._make_homogeneous_graph(
5, [(0, 1), (1, 2), (2, 3), (3, 4), (4, 0)])
self.assertFalse(betti_defense.is_anomalous(clean, schema, 0))
self.assertTrue(betti_defense.is_anomalous(anomalous, schema, 0))
self.assertFalse(betti_defense.is_anomalous(anomalous, schema, 1))

def test_self_loops(self):
# Self-loop adds an edge but doesn't change components.
# 2 nodes, 2 edges (one real, one self-loop), 1 component -> beta_1 = 1
graph, schema = self._make_homogeneous_graph(2, [(0, 1), (1, 1)])
self.assertEqual(betti_defense.calculate_betti_1(graph, schema), 1)

def test_raises_on_heterogeneous(self):
graph = in_memory_graph.InMemoryGraph(
node_sets={
"n1": in_memory_graph.InMemoryNodeSet(num_nodes=2, features={}),
"n2": in_memory_graph.InMemoryNodeSet(num_nodes=2, features={}),
},
edge_sets={
"e1": in_memory_graph.InMemoryEdgeSet(
adjacency=np.array([[0], [1]], dtype=np.int32),
features={},
)
},
)
schema = schema_lib.GraphSchema(
node_sets={
"n1": schema_lib.NodeSchema(features={}),
"n2": schema_lib.NodeSchema(features={}),
},
edge_sets={
"e1": schema_lib.EdgeSchema(
source="n1", target="n2", features={}),
},
)
with self.assertRaises(ValueError):
betti_defense.calculate_betti_1(graph, schema)


if __name__ == "__main__":
absltest.main()
15 changes: 12 additions & 3 deletions dgf/src/analyse/topology/global_graph_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import dataclasses
from typing import Dict, Optional
from dgf.src.analyse.topology import betti_defense
from dgf.src.analyse.topology import node_degree
from dgf.src.data import in_memory_graph
from dgf.src.data import schema as schema_lib
Expand Down Expand Up @@ -73,6 +74,7 @@ class GlobalGraphTopology:
graph_diameter: Optional[float] = None
homophily_ratio: Optional[float] = None
degree_distribution: Optional[Dict[int, int]] = None
betti_1: Optional[int] = None

## No need to call post_init. Use update_graph_density() to auto-calculate
## derived statistics if not provided.
Expand Down Expand Up @@ -121,14 +123,20 @@ def get_in_memory_graph_topology(
degree_distribution = dict(zip(degree.tolist(), counts.tolist()))

## Connected Components
cc = np.array([])
num_cc, cc_counts = np.unique(cc, return_counts=True)
largest_cc = np.max(cc_counts).item()
adj = next(iter(graph.edge_sets.values())).adjacency
num_nodes = next(iter(graph.node_sets.values())).num_nodes or 0
cc_labels = betti_defense.connected_components(adj, num_nodes)
num_cc, cc_counts = np.unique(cc_labels, return_counts=True)
largest_cc = int(np.max(cc_counts))

## Betti-1
betti_1 = edge_set.num_edges() - num_nodes + int(num_cc.shape[0])
else:
average_degree = None
degree_distribution = None
num_cc = None
largest_cc = None
betti_1 = None

num_connected_components = num_cc.shape[0] if num_cc is not None else None
ggt = GlobalGraphTopology(
Expand All @@ -138,6 +146,7 @@ def get_in_memory_graph_topology(
num_connected_components=num_connected_components,
largest_component_size=largest_cc,
degree_distribution=degree_distribution,
betti_1=betti_1,
)

ggt.update_graph_density()
Expand Down
Loading