Skip to content

Commit fa62677

Browse files
authored
Merge pull request #1443 from erikoqvist/produce_updates_to_kafka
Produce updates to kafka
2 parents dbdd40c + b2e086d commit fa62677

13 files changed

Lines changed: 628 additions & 9 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77
docs/sphinx/_build/
88
nipap-www/nipap_www.egg-info
99
nipap/nipap.egg-info
10+
/.idea/

nipap/nipap.conf.dist

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,3 +212,41 @@ secret_key = {{WWW_SECRET_KEY}}
212212
# otlp_http_endpoint=http://opentelemetry-collector:4318/v1/traces
213213
# Set sampler. Valid values are always_on, always_off, parentbased_always_on, parentbased_always_off, traceidratio and parentbased_traceidratio. Default is parentbased_always_on.
214214
# otel_traces_sampler = always_on
215+
216+
#
217+
# Kafka event producer configuration
218+
#
219+
[kafka]
220+
# Enable running the external kafka producer process (true/false)
221+
# If true, nipapd will spawn the kafka_producer as a separate process.
222+
enabled = false
223+
224+
# Comma-separated list of Kafka brokers, e.g. localhost:9092,broker2:9092
225+
#brokers = localhost:9092
226+
227+
# Poll interval in seconds for the kafka producer to poll the DB
228+
#poll_interval = 2
229+
230+
# Topic prefix for produced events (defaults to "nipap.")
231+
#topic_prefix = nipap.
232+
233+
# Security protocol for Kafka connection (e.g., PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
234+
# Defaults to PLAINTEXT if not specified
235+
#security_protocol = PLAINTEXT
236+
237+
# SASL authentication settings (required when using SASL_PLAINTEXT or SASL_SSL)
238+
# SASL mechanism to use (e.g., PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)
239+
#sasl_mechanism = PLAIN
240+
241+
# SASL username for authentication
242+
#sasl_username = your_username
243+
244+
# SASL password for authentication
245+
#sasl_password = your_password
246+
247+
# SSL/TLS settings (used with SSL or SASL_SSL security protocols)
248+
# Path to CA certificate file for SSL/TLS certificate verification
249+
#ssl_cafile = /path/to/ca-cert.pem
250+
251+
# Enable/disable SSL hostname verification (true/false)
252+
#ssl_check_hostname = true

nipap/nipap/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
__version__ = "0.32.7"
2-
__db_version__ = 7
2+
__db_version__ = 8
33
__author__ = "Kristian Larsson, Lukas Garberg"
44
__author_email__ = "kll@tele2.net, lukas@spritelink.net"
55
__copyright__ = "Copyright 2011-2014, Kristian Larsson, Lukas Garberg"

nipap/nipap/db_schema.py

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,17 @@
529529
RETURN (part_one::bigint << 32) + part_two::bigint;
530530
END;
531531
$_$ LANGUAGE plpgsql IMMUTABLE STRICT;
532-
"""
532+
533+
CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$
534+
BEGIN
535+
IF TG_OP = 'DELETE' THEN
536+
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb);
537+
ELSIF OLD IS DISTINCT FROM NEW THEN
538+
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb);
539+
END IF;
540+
RETURN NEW;
541+
END;
542+
$$ LANGUAGE plpgsql;"""
533543

534544
ip_net = """
535545
--------------------------------------------
@@ -538,7 +548,7 @@
538548
--
539549
--------------------------------------------
540550
541-
COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 7';
551+
COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 8';
542552
543553
CREATE EXTENSION IF NOT EXISTS ip4r;
544554
CREATE EXTENSION IF NOT EXISTS hstore;
@@ -790,7 +800,21 @@
790800
CREATE INDEX ip_net_log__prefix__index ON ip_net_log(prefix_id);
791801
CREATE INDEX ip_net_log__pool__index ON ip_net_log(pool_id);
792802
793-
"""
803+
--
804+
-- Kafka event table and triggers
805+
--
806+
-- This table is used as a queue for the external kafka_producer process.
807+
-- Triggers on the core tables insert events here. The daemon will enable or
808+
-- disable these triggers at startup depending on configuration.
809+
--
810+
CREATE TABLE IF NOT EXISTS kafka_produce_event (
811+
id SERIAL PRIMARY KEY,
812+
table_name TEXT NOT NULL,
813+
event_type TEXT NOT NULL,
814+
payload JSONB,
815+
processed BOOLEAN DEFAULT FALSE,
816+
created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
817+
);"""
794818

795819
triggers = """
796820
--
@@ -1768,7 +1792,25 @@
17681792
WHEN (OLD.ipv4_default_prefix_length IS DISTINCT FROM NEW.ipv4_default_prefix_length
17691793
OR OLD.ipv6_default_prefix_length IS DISTINCT FROM NEW.ipv6_default_prefix_length)
17701794
EXECUTE PROCEDURE tf_ip_net_pool__iu_before();
1771-
"""
1795+
1796+
-- Triggers that write to kafka_produce_event
1797+
CREATE TRIGGER trigger_kafka_ip_net_plan
1798+
AFTER INSERT OR UPDATE OR DELETE
1799+
ON ip_net_plan
1800+
FOR EACH ROW
1801+
EXECUTE PROCEDURE tf_kafka_produce_event();
1802+
1803+
CREATE TRIGGER trigger_kafka_ip_net_vrf
1804+
AFTER INSERT OR UPDATE OR DELETE
1805+
ON ip_net_vrf
1806+
FOR EACH ROW
1807+
EXECUTE PROCEDURE tf_kafka_produce_event();
1808+
1809+
CREATE TRIGGER trigger_kafka_ip_net_pool
1810+
AFTER INSERT OR UPDATE OR DELETE
1811+
ON ip_net_pool
1812+
FOR EACH ROW
1813+
EXECUTE PROCEDURE tf_kafka_produce_event();"""
17721814

17731815
upgrade = [
17741816
"""
@@ -2272,4 +2314,59 @@
22722314
-- update database schema version
22732315
COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 7';
22742316
""",
2317+
"""
2318+
--
2319+
-- Upgrade from NIPAP database schema version 7 to 8
2320+
--
2321+
2322+
--
2323+
-- Kafka event table and triggers
2324+
--
2325+
-- This table is used as a queue for the external kafka_producer process.
2326+
-- Triggers on the core tables insert events here. The daemon will enable or
2327+
-- disable these triggers at startup depending on configuration.
2328+
--
2329+
CREATE TABLE IF NOT EXISTS kafka_produce_event (
2330+
id SERIAL PRIMARY KEY,
2331+
table_name TEXT NOT NULL,
2332+
event_type TEXT NOT NULL,
2333+
payload JSONB,
2334+
processed BOOLEAN DEFAULT FALSE,
2335+
created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
2336+
);
2337+
2338+
CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$
2339+
BEGIN
2340+
IF TG_OP = 'DELETE' THEN
2341+
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb);
2342+
ELSIF OLD IS DISTINCT FROM NEW THEN
2343+
INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb);
2344+
END IF;
2345+
RETURN NEW;
2346+
END;
2347+
$$ LANGUAGE plpgsql;
2348+
2349+
-- Triggers that write to kafka_produce_event
2350+
CREATE TRIGGER trigger_kafka_ip_net_plan
2351+
AFTER INSERT OR UPDATE OR DELETE
2352+
ON ip_net_plan
2353+
FOR EACH ROW
2354+
EXECUTE PROCEDURE tf_kafka_produce_event();
2355+
2356+
CREATE TRIGGER trigger_kafka_ip_net_vrf
2357+
AFTER INSERT OR UPDATE OR DELETE
2358+
ON ip_net_vrf
2359+
FOR EACH ROW
2360+
EXECUTE PROCEDURE tf_kafka_produce_event();
2361+
2362+
CREATE TRIGGER trigger_kafka_ip_net_pool
2363+
AFTER INSERT OR UPDATE OR DELETE
2364+
ON ip_net_pool
2365+
FOR EACH ROW
2366+
EXECUTE PROCEDURE tf_kafka_produce_event();
2367+
2368+
2369+
-- update database schema version
2370+
COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 8';
2371+
""",
22752372
]

0 commit comments

Comments
 (0)