From 44111d0bdaebecccf821ea3fc101c831efb29273 Mon Sep 17 00:00:00 2001 From: Simon Meierhans Date: Fri, 3 Jul 2026 07:13:07 -0700 Subject: [PATCH] Add synthetic temporal alert graph generator. PiperOrigin-RevId: 942129546 --- dgf/src/util/BUILD | 36 +++ dgf/src/util/gen_temporal_alert_graph.py | 286 ++++++++++++++++++ dgf/src/util/gen_temporal_alert_graph_main.py | 68 +++++ dgf/src/util/gen_temporal_alert_graph_test.py | 68 +++++ dgf/src/util/gen_test_graph.py | 2 + 5 files changed, 460 insertions(+) create mode 100644 dgf/src/util/gen_temporal_alert_graph.py create mode 100644 dgf/src/util/gen_temporal_alert_graph_main.py create mode 100644 dgf/src/util/gen_temporal_alert_graph_test.py diff --git a/dgf/src/util/BUILD b/dgf/src/util/BUILD index 19a42bf..b4a2aaf 100644 --- a/dgf/src/util/BUILD +++ b/dgf/src/util/BUILD @@ -1,5 +1,6 @@ load("@rules_python//python:py_library.bzl", "py_library") load("@rules_python//python:py_test.bzl", "py_test") +load("@rules_python//python:py_binary.bzl", "py_binary") load("@rules_cc//cc:cc_library.bzl", "cc_library") load("@rules_cc//cc:cc_test.bzl", "cc_test") load("@nanobind_bazel//:build_defs.bzl", "nanobind_extension") @@ -44,6 +45,30 @@ py_library( ], ) +py_library( + name = "gen_temporal_alert_graph", + srcs = ["gen_temporal_alert_graph.py"], + visibility = ["//dgf:__subpackages__"], + deps = [ + "//dgf/src/data:in_memory_graph", + "//dgf/src/data:schema", + "//dgf/src/io:graph_in_memory", + # numpy dep, + ], +) + +py_binary( + name = "gen_temporal_alert_graph_main", + srcs = ["gen_temporal_alert_graph_main.py"], + visibility = ["//dgf:__subpackages__"], + deps = [ + ":gen_temporal_alert_graph", + # absl:app dep, + # absl/flags dep, + # absl/logging dep, + ], +) + py_library( name = "proto", srcs = ["proto.py"], @@ -241,6 +266,17 @@ py_test( ], ) +py_test( + name = "gen_temporal_alert_graph_test", + srcs = ["gen_temporal_alert_graph_test.py"], + deps = [ + ":gen_temporal_alert_graph", + # absl/testing:absltest dep, + "//dgf/src/validate:in_memory_graph", + # numpy dep, + ], +) + py_test( name = "shard_test", srcs = ["shard_test.py"], diff --git a/dgf/src/util/gen_temporal_alert_graph.py b/dgf/src/util/gen_temporal_alert_graph.py new file mode 100644 index 0000000..02ffcaa --- /dev/null +++ b/dgf/src/util/gen_temporal_alert_graph.py @@ -0,0 +1,286 @@ +# Copyright 2022 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility to generate synthetic temporal alert regression graphs. + +This module simulates a heterogeneous, bipartite temporal graph between hardware +monitoring devices and alert events, useful for benchmarking and testing temporal +Graph Neural Network (GNN) regression tasks. + +Graph Topology & Schema: + * Node Set `hardware`: + * `#id`: Primary ID (`hw_0`, `hw_1`, ...). + * `time`: 1D array of irregularly sampled timestamps. + * `signal`: 1D array of noisy sinusoidal telemetry values sampled at `time`. + * Node Set `alerts`: + * `#id`: Primary ID (`alert_0`, `alert_1`, ...). + * `#creation_time`: Timestamp when the alert was triggered. + * `signal_regression`: Target continuous regression label. + * Edge Set `hardware_to_alert`: + * Directed edges linking hardware nodes to alert nodes. + +Target Formulation (`signal_regression`): + For each alert triggered at t_create with lookback window W, the target label + is computed by linearly interpolating discrete hardware signals over the window + [t_create - W, t_create], summing the integrals across connected neighbors, and + normalizing by W. +""" + +from typing import Optional +from dgf.src.data import in_memory_graph as in_memory_graph_lib +from dgf.src.data import schema as schema_lib +from dgf.src.io import graph_in_memory as gf_graph_in_memory_lib +import numpy as np + + +def generate_signal_regression_schema() -> schema_lib.GraphSchema: + """Generates schema for a signal regression temporal heterogeneous graph. + + Returns: + A GraphSchema for the signal regression task. + """ + return schema_lib.GraphSchema( + node_sets={ + "hardware": schema_lib.NodeSchema( + features={ + "#id": schema_lib.FeatureSchema( + format=schema_lib.FeatureFormat.BYTES, + semantic=schema_lib.FeatureSemantic.PRIMARY_ID, + ), + "time": schema_lib.FeatureSchema( + format=schema_lib.FeatureFormat.INTEGER_64, + semantic=schema_lib.FeatureSemantic.TIMESTAMP, + shape=(None,), + is_timeseries=True, + ), + "signal": schema_lib.FeatureSchema( + format=schema_lib.FeatureFormat.FLOAT_32, + semantic=schema_lib.FeatureSemantic.NUMERICAL, + shape=(None,), + is_timeseries=True, + timestamps="time", + ), + } + ), + "alerts": schema_lib.NodeSchema( + features={ + "#id": schema_lib.FeatureSchema( + format=schema_lib.FeatureFormat.BYTES, + semantic=schema_lib.FeatureSemantic.PRIMARY_ID, + ), + "#creation_time": schema_lib.FeatureSchema( + format=schema_lib.FeatureFormat.INTEGER_64, + semantic=schema_lib.FeatureSemantic.TIMESTAMP, + ), + "signal_regression": schema_lib.FeatureSchema( + format=schema_lib.FeatureFormat.FLOAT_32, + semantic=schema_lib.FeatureSemantic.NUMERICAL, + ), + } + ), + }, + edge_sets={ + "hardware_to_alert": schema_lib.EdgeSchema( + source="hardware", target="alerts" + ), + }, + ) + + +def generate_signal_regression_in_memory_graph( + num_hardware: int = 10, + num_alerts: int = 10, + start_time: int = 1700000000, + duration: int = 86400, + sample_interval_mean: int = 1200, + sample_interval_jitter: int = 300, + window_duration: int = 3600, + max_num_neighbors: int = 5, + seed: int = 42, +) -> in_memory_graph_lib.InMemoryGraph: + """Generates an in-memory heterogeneous temporal signal regression graph. + + Args: + num_hardware: Number of hardware nodes to generate. + num_alerts: Number of alert nodes to generate. + start_time: Unix epoch start time in seconds. + duration: Total simulation time interval in seconds. + sample_interval_mean: Mean interval between hardware timestamps in seconds. + sample_interval_jitter: Max jitter added to timestamp intervals in seconds. + window_duration: Lookback window in seconds for the signal integral. + max_num_neighbors: Maximum number of connected hardware nodes per alert. + seed: Random seed for reproducible graph generation. + + Returns: + An InMemoryGraph object. + """ + if window_duration > duration: + raise ValueError( + f"'window_duration' ({window_duration}) must be less than or equal to" + f" 'duration' ({duration})." + ) + rng = np.random.RandomState(seed) + + hw_ids = [f"hw_{i}".encode("utf-8") for i in range(num_hardware)] + hw_times_list = [] + hw_signals_list = [] + + for _ in range(num_hardware): + t = start_time + times = [] + signals = [] + while t <= start_time + duration: + times.append(int(t)) + phase = 2.0 * np.pi * (t - start_time) / float(duration) + s = float(np.sin(phase) + rng.normal(0, 0.1)) + signals.append(s) + step = sample_interval_mean + rng.randint( + -sample_interval_jitter, sample_interval_jitter + 1 + ) + t += max(1, step) + hw_times_list.append(np.array(times, dtype=np.int64)) + hw_signals_list.append(np.array(signals, dtype=np.float32)) + + hw_id_array = np.array(hw_ids, dtype=np.bytes_) + hw_time_array = np.array(hw_times_list, dtype=np.object_) + hw_signal_array = np.array(hw_signals_list, dtype=np.object_) + + alert_ids = [f"alert_{i}".encode("utf-8") for i in range(num_alerts)] + alert_creation_times = start_time + rng.randint( + window_duration, duration + 1, size=num_alerts + ) + + edge_sources = [] + edge_targets = [] + alert_regression = [] + + for alert_idx in range(num_alerts): + t_create = int(alert_creation_times[alert_idx]) + t_start = t_create - window_duration + k = rng.randint(1, min(max_num_neighbors + 1, num_hardware + 1)) + chosen_hw_indices = rng.choice(num_hardware, size=k, replace=False) + + total_integral = 0.0 + for hw_idx in chosen_hw_indices: + edge_sources.append(hw_idx) + edge_targets.append(alert_idx) + + hw_t = hw_times_list[hw_idx] + hw_s = hw_signals_list[hw_idx] + + if len(hw_t) == 0: + continue + + def eval_signal(t_val: float) -> float: + if t_val <= hw_t[0]: + return float(hw_s[0]) + if t_val >= hw_t[-1]: + return float(hw_s[-1]) + idx = int(np.searchsorted(hw_t, t_val)) + if hw_t[idx] == t_val: + return float(hw_s[idx]) + t0, t1 = hw_t[idx - 1], hw_t[idx] + s0, s1 = hw_s[idx - 1], hw_s[idx] + frac = (t_val - t0) / (t1 - t0) + return float(s0 + frac * (s1 - s0)) + + mask = (hw_t >= t_start) & (hw_t <= t_create) + window_t = [float(t_start)] + list(hw_t[mask]) + [float(t_create)] + window_t = sorted(list(set(window_t))) + + integral_h = 0.0 + for j in range(len(window_t) - 1): + ta = window_t[j] + tb = window_t[j + 1] + sa = eval_signal(ta) + sb = eval_signal(tb) + integral_h += 0.5 * (sa + sb) * (tb - ta) + + total_integral += integral_h + + val = total_integral / float(window_duration) + alert_regression.append(val) + + alert_id_array = np.array(alert_ids, dtype=np.bytes_) + alert_time_array = np.array(alert_creation_times, dtype=np.int64) + alert_reg_array = np.array(alert_regression, dtype=np.float32) + + adj = np.array([edge_sources, edge_targets], dtype=np.int64) + + return in_memory_graph_lib.InMemoryGraph( + node_sets={ + "hardware": in_memory_graph_lib.InMemoryNodeSet( + features={ + "#id": hw_id_array, + "time": hw_time_array, + "signal": hw_signal_array, + }, + num_nodes=num_hardware, + ), + "alerts": in_memory_graph_lib.InMemoryNodeSet( + features={ + "#id": alert_id_array, + "#creation_time": alert_time_array, + "signal_regression": alert_reg_array, + }, + num_nodes=num_alerts, + ), + }, + edge_sets={ + "hardware_to_alert": in_memory_graph_lib.InMemoryEdgeSet( + adjacency=adj + ), + }, + ) + + +def generate_signal_regression_graph( + path: str, + num_hardware: int = 10, + num_alerts: int = 10, + start_time: int = 1700000000, + duration: int = 86400, + sample_interval_mean: int = 1200, + sample_interval_jitter: int = 300, + window_duration: int = 3600, + max_num_neighbors: int = 5, + seed: int = 42, +): + """Generates a signal regression temporal graph on disk in GF format. + + Args: + path: The directory path to write the generated GF graph. + num_hardware: Number of hardware nodes to generate. + num_alerts: Number of alert nodes to generate. + start_time: Unix epoch start time in seconds. + duration: Total simulation time interval in seconds. + sample_interval_mean: Mean interval between hardware timestamps in seconds. + sample_interval_jitter: Max jitter added to timestamp intervals in seconds. + window_duration: Lookback window in seconds for the signal integral. + max_num_neighbors: Maximum number of connected hardware nodes per alert. + seed: Random seed for reproducible graph generation. + """ + schema = generate_signal_regression_schema() + graph = generate_signal_regression_in_memory_graph( + num_hardware=num_hardware, + num_alerts=num_alerts, + start_time=start_time, + duration=duration, + sample_interval_mean=sample_interval_mean, + sample_interval_jitter=sample_interval_jitter, + window_duration=window_duration, + max_num_neighbors=max_num_neighbors, + seed=seed, + ) + gf_graph_in_memory_lib.write_graph(graph, schema, path) diff --git a/dgf/src/util/gen_temporal_alert_graph_main.py b/dgf/src/util/gen_temporal_alert_graph_main.py new file mode 100644 index 0000000..ad164dc --- /dev/null +++ b/dgf/src/util/gen_temporal_alert_graph_main.py @@ -0,0 +1,68 @@ +# Copyright 2022 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Script to generate synthetic temporal alert regression graph datasets.""" + +import os +from absl import app +from absl import flags +from absl import logging +from dgf.src.util import gen_temporal_alert_graph + +FLAGS = flags.FLAGS + +flags.DEFINE_string( + "base_output_dir", + None, + "Base directory to write the generated synthetic temporal datasets.", + required=True, +) +flags.DEFINE_integer("seed", 42, "Random seed for graph generation.") +flags.DEFINE_integer( + "max_num_neighbors", 5, "Maximum number of connected hardware nodes per alert." +) + + +def main(argv): + if len(argv) > 1: + raise app.UsageError("Too many command-line arguments.") + + base_dir = FLAGS.base_output_dir + + datasets = [ + ("alert_regression_100k", 50000, 50000), + ] + + for name, num_hw, num_alerts in datasets: + output_path = os.path.join(base_dir, name) + logging.info( + "Generating %s (%d HW nodes, %d Alert nodes, max %d neighbors) to %s", + name, + num_hw, + num_alerts, + FLAGS.max_num_neighbors, + output_path, + ) + gen_temporal_alert_graph.generate_signal_regression_graph( + path=output_path, + num_hardware=num_hw, + num_alerts=num_alerts, + max_num_neighbors=FLAGS.max_num_neighbors, + seed=FLAGS.seed, + ) + logging.info("Successfully wrote %s to %s", name, output_path) + + +if __name__ == "__main__": + app.run(main) diff --git a/dgf/src/util/gen_temporal_alert_graph_test.py b/dgf/src/util/gen_temporal_alert_graph_test.py new file mode 100644 index 0000000..3639484 --- /dev/null +++ b/dgf/src/util/gen_temporal_alert_graph_test.py @@ -0,0 +1,68 @@ +# Copyright 2022 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from absl.testing import absltest +from dgf.src.util import gen_temporal_alert_graph +from dgf.src.validate import in_memory_graph as in_memory_graph_validate_lib +import numpy as np + + +def _list_all_files(root_dir: str) -> list[str]: + all_files = [] + for dirpath, _, filenames in os.walk(root_dir): + for filename in filenames: + rel_path = os.path.relpath(os.path.join(dirpath, filename), root_dir) + all_files.append(rel_path) + return all_files + + +class GenTemporalAlertGraphTest(absltest.TestCase): + + def test_generate_signal_regression_dataset(self): + num_hw = 15 + num_alerts = 25 + schema = gen_temporal_alert_graph.generate_signal_regression_schema() + graph = gen_temporal_alert_graph.generate_signal_regression_in_memory_graph( + num_hardware=num_hw, num_alerts=num_alerts, seed=42 + ) + in_memory_graph_validate_lib.validate_graph(graph, schema) + self.assertEqual(graph.node_sets["hardware"].num_nodes, num_hw) + self.assertEqual(graph.node_sets["alerts"].num_nodes, num_alerts) + self.assertEqual( + graph.node_sets["alerts"].features["signal_regression"].dtype, + np.float32, + ) + creation_times = graph.node_sets["alerts"].features["#creation_time"] + self.assertTrue(np.all(creation_times >= 1700000000 + 3600)) + + def test_generate_signal_regression_graph(self): + work_dir = self.create_tempdir().full_path + gen_temporal_alert_graph.generate_signal_regression_graph(work_dir) + self.assertIn("metadata.json", _list_all_files(work_dir)) + self.assertIn("schema.json", _list_all_files(work_dir)) + + def test_generate_signal_regression_invalid_window(self): + with self.assertRaisesRegex( + ValueError, + r"'window_duration' \(2000\) must be less than or equal to 'duration'" + r" \(1000\)\.", + ): + gen_temporal_alert_graph.generate_signal_regression_in_memory_graph( + duration=1000, window_duration=2000 + ) + + +if __name__ == "__main__": + absltest.main() \ No newline at end of file diff --git a/dgf/src/util/gen_test_graph.py b/dgf/src/util/gen_test_graph.py index 5ed8588..1ea2c0e 100644 --- a/dgf/src/util/gen_test_graph.py +++ b/dgf/src/util/gen_test_graph.py @@ -2209,3 +2209,5 @@ def generate_recommender_like_in_memory_graph() -> ( }, ) return graph, schema + +