Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .config
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ gst_colorspace: BGR
; Examples: decodebin, avdec_h264, nvh264dec
gst_decoder: avdec_h264

; Max decoded frames buffered per GStreamer queue element. Higher values absorb
; processing spikes without dropping frames. On memory-constrained systems this
; can trigger a swap death spiral. Reduce if experiencing OOM even after reducing
; num_cores to 1.
gst_queue_size: 100

; Path to the camera settings json
; e.g. ~/username/source/Stations/XX0001/camera_settings.json
; or ./camera_settings.json
Expand Down
58 changes: 54 additions & 4 deletions RMS/BufferedCapture.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,13 +1097,14 @@ def createGstreamDevice(self, video_format, gst_decoder='decodebin',
).format(protocol_str, device_url)

# Branch for processing
queue_size = self.config.gst_queue_size
processing_branch = (
"t. ! queue ! {:s} ! "
"queue leaky=downstream max-size-buffers=100 max-size-bytes=0 max-size-time=0 ! "
"queue leaky=downstream max-size-buffers={:d} max-size-bytes=0 max-size-time=0 ! "
"videoconvert ! video/x-raw,format={:s} ! "
"queue max-size-buffers=100 max-size-bytes=0 max-size-time=0 ! "
"appsink max-buffers=100 drop=true sync=0 name=appsink"
).format(gst_decoder, video_format)
"queue max-size-buffers={:d} max-size-bytes=0 max-size-time=0 ! "
"appsink max-buffers={:d} drop=true sync=0 name=appsink"
).format(gst_decoder, queue_size, video_format, queue_size, queue_size)

# Branch for storage - if video_file_dir is not None, save the raw stream to a file
if video_file_dir is not None:
Expand Down Expand Up @@ -1772,6 +1773,36 @@ def run(self):

log.debug("Process-specific initialization complete")

# Prevent GStreamer and other grandchild processes from inheriting the
# large shared frame buffers (~506 MB each at 1080p). These buffers are
# only needed by BufferedCapture and Compressor; any subprocess forked
# from here (e.g. GStreamer NVENC encoder, RawFrameSaver) does not need
# them. MADV_DONTFORK tells the kernel to exclude these pages from any
# child created via fork(), so grandchildren never see them at all.
# Compressor is unaffected because it forks from StartCapture, not here.
try:
_libc = ctypes.CDLL('libc.so.6', use_errno=True)
_libc.madvise.restype = ctypes.c_int
_libc.madvise.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int]
MADV_DONTFORK = 10

total_bytes = 0
for arr in (self.array1, self.array2):
try:
ret = _libc.madvise(arr.ctypes.data, arr.nbytes, MADV_DONTFORK)
if ret == 0:
total_bytes += arr.nbytes
except Exception:
pass

if total_bytes:
log.debug("Marked frame buffers ({:.0f} MB) as DONTFORK to prevent "
"inheritance by grandchild processes".format(
total_bytes / (1024*1024)))

except Exception as e:
log.warning("Could not set MADV_DONTFORK on frame buffers: {}".format(e))

# Main capture loop
while not self.exit.is_set() and not self.initVideoDevice():
# Update heartbeat during connection attempts to show we're still alive
Expand Down Expand Up @@ -2185,6 +2216,25 @@ def captureFrames(self):
# Force device re-initialization by releasing and reconnecting
log.info("Releasing resources to re-initialize video device with GStreamer")
self.releaseResources()

# If transitioning to daytime, release the compression frame
# buffers back to the OS. They are not used during the day
# (frame writes are skipped in daytime mode) and would otherwise
# sit in swap until nightfall. MADV_DONTNEED drops the pages
# immediately; they get zero-filled on next write at no cost.
if current_daytime:
try:
_libc = ctypes.CDLL('libc.so.6', use_errno=True)
_libc.madvise.restype = ctypes.c_int
_libc.madvise.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int]
MADV_DONTNEED = 4
for arr in (self.array1, self.array2):
_libc.madvise(arr.ctypes.data, arr.nbytes, MADV_DONTNEED)
log.info("Released compression frame buffers ({:.0f} MB) back to OS".format(
sum(a.nbytes for a in (self.array1, self.array2)) / (1024*1024)))
except Exception as e:
log.warning("Could not release frame buffers: {}".format(e))

wait_for_reconnect = True
break

Expand Down
16 changes: 14 additions & 2 deletions RMS/Compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ def stop(self):
# Always join to reap zombie (returns instantly if already dead)
self.join()

else:
# Process is not alive but may not have been joined yet - reap it
log.debug("Compression process not alive, joining to reap resources")
self.join(timeout=5)

# Return the detector and live viewer objects because they were updated in this namespace
return self.detector

Expand Down Expand Up @@ -283,9 +288,9 @@ def run(self):
if self.exit.is_set():

log.debug('Compression run exit')
self.run_exited.set()

return None
self.run_exited.set()
os._exit(0)

time.sleep(0.1)

Expand Down Expand Up @@ -390,7 +395,14 @@ def run(self):


log.debug('Compression run exit')

time.sleep(1.0)
self.run_exited.set()

# Force-exit the process. The forked QueuedPool Manager proxy threads
# hold open socket connections that survive even after dropping all
# Python references. os._exit() is the only reliable way to terminate
# the process without waiting for those threads.
os._exit(0)


6 changes: 6 additions & 0 deletions RMS/ConfigReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ def __init__(self):
# Decoder for the gstreamer media backend (e.g. decodebin, avdec_h264, nvh264dec)
self.gst_decoder = "avdec_h264"

# Max buffers per GStreamer queue element (lower values reduce memory usage on multi-cam systems)
self.gst_queue_size = 100

# Path to the json file containing camera settings
self.camera_settings_path = "./camera_settings.json"

Expand Down Expand Up @@ -1161,6 +1164,9 @@ def parseCapture(config, parser):
if parser.has_option(section, "gst_decoder"):
config.gst_decoder = parser.get(section, "gst_decoder")

if parser.has_option(section, "gst_queue_size"):
config.gst_queue_size = parser.getint(section, "gst_queue_size")

if parser.has_option(section, "camera_settings_path") and os.path.isfile(parser.get(section, "camera_settings_path")):
config.camera_settings_path = parser.get(section, "camera_settings_path")
else:
Expand Down
1 change: 1 addition & 0 deletions RMS/DetectStarsAndMeteors.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def detectStarsAndMeteorsDirectory(dir_path, config, output_suffix=''):

# Get the detection results from the queue
detection_results = detector.getResults()
detector.shutdownManager()


# Save detection to disk
Expand Down
1 change: 1 addition & 0 deletions RMS/ExtractStars.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ def extractStarsAndSave(config, ff_dir):
workpool.closePool()

results = workpool.getResults()
workpool.shutdownManager()


# Get extraction results
Expand Down
23 changes: 17 additions & 6 deletions RMS/QueuedPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,17 @@ def __init__(self, func, cores=None, log=None, delay_start=0, worker_timeout=200
self.func_kwargs = func_kwargs
self.worker_wait_inbetween_jobs = worker_wait_inbetween_jobs

# Initialize queues (for some reason queues from Manager need to be created, otherwise they are
# Initialize queues (for some reason queues from Manager need to be created, otherwise they are
# blocking when using get_nowait)
manager = multiprocessing.Manager()
self.manager = multiprocessing.Manager()

# Only init with maxsize if given, otherwise it return a TypeError when fed data from Compressor
if input_queue_maxsize is None:
self.input_queue = manager.Queue()
self.input_queue = self.manager.Queue()
else:
self.input_queue = manager.Queue(maxsize=input_queue_maxsize)
self.input_queue = self.manager.Queue(maxsize=input_queue_maxsize)

self.output_queue = manager.Queue()
self.output_queue = self.manager.Queue()

self.func = func
self.pool = None
Expand Down Expand Up @@ -463,7 +463,7 @@ def closePool(self):
break




# If all workers are idle, set the last idle time
if all_workers_idle_time is None:
Expand Down Expand Up @@ -544,6 +544,17 @@ def closePool(self):



def shutdownManager(self):
""" Shut down the Manager server process. Call this after all results have been collected. """

if hasattr(self, 'manager') and self.manager is not None:
try:
self.manager.shutdown()
except Exception:
pass
self.manager = None


def updateCoreNumber(self, cores=None):
""" Update the number of cores/workers used by the pool.

Expand Down
3 changes: 3 additions & 0 deletions RMS/StartCapture.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,9 @@ def runCapture(config, duration=None, video_file=None, nodetect=False, detect_en
# Get the detection results from the queue
detection_results = detector.getResults()

# Shut down the Manager server process now that results are collected
detector.shutdownManager()

else:

detection_results = []
Expand Down
Loading