Skip to content

Commit e91ca56

Browse files
authored
Fix NetBox state change race conditions with unified locking and connection limiting (#1839)
Implemented critical fixes to prevent race conditions when multiple state changes occur simultaneously for NetBox nodes: 1. Unified Lock Keys: - All three state-setting functions now use same lock key per device - Previously each function used different locks allowing parallel execution - Prevents race conditions when multiple states change in quick succession - Fixed incorrect lock key in set_power_state (was using provision_state key) - Lock key: lock_osism_tasks_netbox_{device_name} 2. NetBox Connection Limiting: - Implemented custom Redis-based semaphore (pottery lacks RedisSemaphore) - Default: maximum 5 concurrent operations per NetBox instance - Configurable via NETBOX_MAX_CONNECTIONS environment variable - Prevents overloading NetBox services during parallel operations - Works for both primary and secondary NetBox instances - Uses Redis sorted sets for distributed coordination 3. Increased Timeouts: - Lock acquisition timeout: 20s -> 120s (2 minutes) - Auto-release time: 60s -> 300s (5 minutes) - Handles high-concurrency scenarios during node provisioning and sync Technical Implementation: - Custom RedisSemaphore class using Redis ZSET operations - Helper function _update_netbox_device_field() wraps all NetBox updates - Per-URL semaphore with MD5 hash-based key generation - Automatic cleanup of expired semaphore holders (60s TTL) AI-assisted: Claude Code Signed-off-by: Christian Berendt <berendt@osism.tech>
1 parent e8dc124 commit e91ca56

3 files changed

Lines changed: 149 additions & 39 deletions

File tree

osism/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,6 @@ def read_secret(secret_name):
5959

6060
# Redfish connection timeout in seconds
6161
REDFISH_TIMEOUT = int(os.getenv("REDFISH_TIMEOUT", "20"))
62+
63+
# NetBox connection limiting
64+
NETBOX_MAX_CONNECTIONS = int(os.getenv("NETBOX_MAX_CONNECTIONS", "5"))

osism/tasks/netbox.py

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,28 @@ def setup_periodic_tasks(sender, **kwargs):
1515
pass
1616

1717

18+
def _update_netbox_device_field(nb, device_name, field_name, value):
19+
"""Helper to update a NetBox device field with semaphore limiting.
20+
21+
Args:
22+
nb: NetBox API instance
23+
device_name: Name of the device
24+
field_name: Custom field name to update
25+
value: Value to set
26+
27+
Returns:
28+
bool: True if successful, False otherwise
29+
"""
30+
semaphore = utils.create_netbox_semaphore(nb.base_url)
31+
with semaphore:
32+
device = nb.dcim.devices.get(name=device_name)
33+
if device:
34+
device.custom_fields.update({field_name: value})
35+
device.save()
36+
return True
37+
return False
38+
39+
1840
def _matches_netbox_filter(nb, netbox_filter, is_primary=False):
1941
"""Check if a NetBox instance matches the given filter.
2042
@@ -84,21 +106,19 @@ def set_maintenance(
84106
utils.check_task_lock_and_exit()
85107

86108
lock = utils.create_redlock(
87-
key=f"lock_osism_tasks_netbox_set_maintenance_{device_name}",
88-
auto_release_time=60,
109+
key=f"lock_osism_tasks_netbox_{device_name}",
110+
auto_release_time=300,
89111
)
90-
if lock.acquire(timeout=20):
112+
if lock.acquire(timeout=120):
91113
try:
92114
# Process primary NetBox
93115
if _matches_netbox_filter(utils.nb, netbox_filter, is_primary=True):
94116
logger.info(
95117
f"Set maintenance state of device {device_name} = {state} on {utils.nb.base_url}"
96118
)
97-
device = utils.nb.dcim.devices.get(name=device_name)
98-
if device:
99-
device.custom_fields.update({"maintenance": state})
100-
device.save()
101-
else:
119+
if not _update_netbox_device_field(
120+
utils.nb, device_name, "maintenance", state
121+
):
102122
logger.error(
103123
f"Could not set maintenance for {device_name} on {utils.nb.base_url}"
104124
)
@@ -123,11 +143,9 @@ def set_maintenance(
123143
logger.info(
124144
f"Set maintenance state of device {device_name} = {state} on {nb.base_url}"
125145
)
126-
device = nb.dcim.devices.get(name=device_name)
127-
if device:
128-
device.custom_fields.update({"maintenance": state})
129-
device.save()
130-
else:
146+
if not _update_netbox_device_field(
147+
nb, device_name, "maintenance", state
148+
):
131149
logger.error(
132150
f"Could not set maintenance for {device_name} on {nb.base_url}"
133151
)
@@ -156,21 +174,19 @@ def set_provision_state(
156174
utils.check_task_lock_and_exit()
157175

158176
lock = utils.create_redlock(
159-
key=f"lock_osism_tasks_netbox_set_provision_state_{device_name}",
160-
auto_release_time=60,
177+
key=f"lock_osism_tasks_netbox_{device_name}",
178+
auto_release_time=300,
161179
)
162-
if lock.acquire(timeout=20):
180+
if lock.acquire(timeout=120):
163181
try:
164182
# Process primary NetBox
165183
if _matches_netbox_filter(utils.nb, netbox_filter, is_primary=True):
166184
logger.info(
167185
f"Set provision state of device {device_name} = {state} on {utils.nb.base_url}"
168186
)
169-
device = utils.nb.dcim.devices.get(name=device_name)
170-
if device:
171-
device.custom_fields.update({"provision_state": state})
172-
device.save()
173-
else:
187+
if not _update_netbox_device_field(
188+
utils.nb, device_name, "provision_state", state
189+
):
174190
logger.error(
175191
f"Could not set provision state for {device_name} on {utils.nb.base_url}"
176192
)
@@ -195,11 +211,9 @@ def set_provision_state(
195211
logger.info(
196212
f"Set provision state of device {device_name} = {state} on {nb.base_url}"
197213
)
198-
device = nb.dcim.devices.get(name=device_name)
199-
if device:
200-
device.custom_fields.update({"provision_state": state})
201-
device.save()
202-
else:
214+
if not _update_netbox_device_field(
215+
nb, device_name, "provision_state", state
216+
):
203217
logger.error(
204218
f"Could not set provision state for {device_name} on {nb.base_url}"
205219
)
@@ -232,21 +246,19 @@ def set_power_state(
232246
utils.check_task_lock_and_exit()
233247

234248
lock = utils.create_redlock(
235-
key=f"lock_osism_tasks_netbox_set_provision_state_{device_name}",
236-
auto_release_time=60,
249+
key=f"lock_osism_tasks_netbox_{device_name}",
250+
auto_release_time=300,
237251
)
238-
if lock.acquire(timeout=20):
252+
if lock.acquire(timeout=120):
239253
try:
240254
# Process primary NetBox
241255
if _matches_netbox_filter(utils.nb, netbox_filter, is_primary=True):
242256
logger.info(
243257
f"Set power state of device {device_name} = {state} on {utils.nb.base_url}"
244258
)
245-
device = utils.nb.dcim.devices.get(name=device_name)
246-
if device:
247-
device.custom_fields.update({"power_state": state})
248-
device.save()
249-
else:
259+
if not _update_netbox_device_field(
260+
utils.nb, device_name, "power_state", state
261+
):
250262
logger.error(
251263
f"Could not set power state for {device_name} on {utils.nb.base_url}"
252264
)
@@ -271,11 +283,9 @@ def set_power_state(
271283
logger.info(
272284
f"Set power state of device {device_name} = {state} on {nb.base_url}"
273285
)
274-
device = nb.dcim.devices.get(name=device_name)
275-
if device:
276-
device.custom_fields.update({"power_state": state})
277-
device.save()
278-
else:
286+
if not _update_netbox_device_field(
287+
nb, device_name, "power_state", state
288+
):
279289
logger.error(
280290
f"Could not set power state for {device_name} on {nb.base_url}"
281291
)

osism/utils/__init__.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-License-Identifier: Apache-2.0
22

33
import time
4+
import uuid
45
import os
56
from contextlib import redirect_stdout, redirect_stderr
67
from cryptography.fernet import Fernet
@@ -18,6 +19,74 @@
1819
from osism import settings
1920

2021

22+
class RedisSemaphore:
23+
"""Redis-based distributed semaphore for limiting concurrent operations.
24+
25+
This implementation uses Redis sorted sets to track active holders and enforce
26+
a maximum concurrency limit.
27+
"""
28+
29+
def __init__(self, redis_client, key, maxsize, timeout=None):
30+
"""Initialize the semaphore.
31+
32+
Args:
33+
redis_client: Redis client instance
34+
key: Redis key for this semaphore
35+
maxsize: Maximum number of concurrent holders
36+
timeout: Optional timeout for acquisition in seconds
37+
"""
38+
self.redis = redis_client
39+
self.key = f"semaphore:{key}"
40+
self.maxsize = maxsize
41+
self.timeout = timeout
42+
self.identifier = None
43+
44+
def acquire(self, timeout=None):
45+
"""Acquire the semaphore.
46+
47+
Args:
48+
timeout: Optional timeout in seconds (overrides instance timeout)
49+
50+
Returns:
51+
bool: True if acquired, False if timeout
52+
"""
53+
timeout = timeout or self.timeout or 10
54+
identifier = str(uuid.uuid4())
55+
now = time.time()
56+
end_time = now + timeout
57+
58+
while time.time() < end_time:
59+
# Clean up expired holders
60+
self.redis.zremrangebyscore(self.key, 0, now - 60)
61+
62+
# Try to acquire
63+
if self.redis.zcard(self.key) < self.maxsize:
64+
self.redis.zadd(self.key, {identifier: now})
65+
self.identifier = identifier
66+
return True
67+
68+
time.sleep(0.01) # Wait 10ms before retry
69+
70+
return False
71+
72+
def release(self):
73+
"""Release the semaphore."""
74+
if self.identifier:
75+
self.redis.zrem(self.key, self.identifier)
76+
self.identifier = None
77+
78+
def __enter__(self):
79+
"""Context manager entry."""
80+
if not self.acquire():
81+
raise TimeoutError(f"Could not acquire semaphore {self.key}")
82+
return self
83+
84+
def __exit__(self, exc_type, exc_val, exc_tb):
85+
"""Context manager exit."""
86+
self.release()
87+
return False
88+
89+
2190
class TimeoutHTTPAdapter(HTTPAdapter):
2291
"""HTTPAdapter that sets a default timeout for all requests."""
2392

@@ -291,6 +360,34 @@ def create_redlock(key, auto_release_time=3600):
291360
)
292361

293362

363+
def create_netbox_semaphore(netbox_url, max_connections=None):
364+
"""
365+
Create a Redis semaphore for limiting concurrent NetBox connections.
366+
367+
Args:
368+
netbox_url (str): The NetBox URL to create a semaphore for
369+
max_connections (int): Maximum concurrent connections (default: from settings.NETBOX_MAX_CONNECTIONS)
370+
371+
Returns:
372+
RedisSemaphore: The configured semaphore instance
373+
"""
374+
if max_connections is None:
375+
max_connections = settings.NETBOX_MAX_CONNECTIONS
376+
377+
# Create unique key per NetBox instance based on URL
378+
import hashlib
379+
380+
url_hash = hashlib.md5(netbox_url.encode()).hexdigest()[:8]
381+
key = f"netbox_semaphore_{url_hash}"
382+
383+
return RedisSemaphore(
384+
key=key,
385+
maxsize=max_connections,
386+
redis_client=redis,
387+
timeout=30,
388+
)
389+
390+
294391
def set_task_lock(user=None, reason=None):
295392
"""
296393
Set task lock to prevent new tasks from starting.

0 commit comments

Comments
 (0)