From 4441aaa879ad77668a96e56dcfc6ff39b20947f9 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Wed, 29 Apr 2026 19:55:21 -0500 Subject: [PATCH 01/13] Add RTMP driver --- sensors/video/sensorhub-driver-rtmp/README.md | 7 + .../video/sensorhub-driver-rtmp/build.gradle | 28 ++ .../sensorhub/impl/sensor/rtmp/Activator.java | 13 + .../impl/sensor/rtmp/RtmpDescriptor.java | 41 +++ .../impl/sensor/rtmp/RtmpDriver.java | 264 ++++++++++++++++++ .../impl/sensor/rtmp/RtmpUrlArbiter.java | 16 ++ .../sensor/rtmp/config/ConnectionConfig.java | 24 ++ .../impl/sensor/rtmp/config/HostType.java | 14 + .../impl/sensor/rtmp/config/RtmpConfig.java | 31 ++ .../org.sensorhub.api.module.IModuleProvider | 1 + 10 files changed, 439 insertions(+) create mode 100644 sensors/video/sensorhub-driver-rtmp/README.md create mode 100644 sensors/video/sensorhub-driver-rtmp/build.gradle create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider diff --git a/sensors/video/sensorhub-driver-rtmp/README.md b/sensors/video/sensorhub-driver-rtmp/README.md new file mode 100644 index 000000000..3f5285797 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/README.md @@ -0,0 +1,7 @@ +### Axis PTZ (Pan-Tilt-Zoom) Video Camera + +OSH sensor adaptor supporting output (video and PTZ settings) and tasking (camera and PTZ) for Axis cameras. + +This driver depends on the following modules at runtime: + * sensorhub-driver-rtpcam + * sensorhub-driver-videocam diff --git a/sensors/video/sensorhub-driver-rtmp/build.gradle b/sensors/video/sensorhub-driver-rtmp/build.gradle new file mode 100644 index 000000000..133071bd2 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/build.gradle @@ -0,0 +1,28 @@ +description = 'RTMP Video Driver' +ext.details = 'Driver to listen for and demux RTMP video streams' +version = '1.0.0' + +dependencies { + implementation 'org.sensorhub:sensorhub-core:' + oshCoreVersion + implementation project(':sensorhub-driver-ffmpeg') +} + +// add info to OSGi manifest +osgi { + manifest { + attributes('Bundle-Vendor': 'Botts Innovative Research, Inc.') + attributes('Bundle-Activator': 'org.sensorhub.impl.sensor.rtmp.Activator') + } +} + +// add info to maven pom +ext.pom >>= { + developers { + developer { + id 'kyle-fitzp' + name 'Kyle Fitzpatrick' + organization 'GeoRobotix Inc.' + organizationUrl 'http://www.georobotix.com' + } + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java new file mode 100644 index 000000000..8efb94e8b --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java @@ -0,0 +1,13 @@ +package org.sensorhub.impl.sensor.rtmp; + +import org.osgi.framework.BundleActivator; +import org.sensorhub.utils.OshBundleActivator; + + +/* + * Needed to expose java services as OSGi services + */ +public class Activator extends OshBundleActivator implements BundleActivator +{ + +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java new file mode 100644 index 000000000..73724e3c4 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java @@ -0,0 +1,41 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +The Initial Developer is Botts Innovative Research Inc.. Portions created by the Initial +Developer are Copyright (C) 2014 the Initial Developer. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp; + +import org.sensorhub.api.module.IModule; +import org.sensorhub.api.module.IModuleProvider; +import org.sensorhub.api.module.ModuleConfig; +import org.sensorhub.impl.module.JarModuleProvider; +import org.sensorhub.impl.sensor.rtmp.config.RtmpConfig; + + +public class RtmpDescriptor extends JarModuleProvider implements IModuleProvider +{ + + @Override + public Class> getModuleClass() + { + return RtmpDriver.class; + } + + + @Override + public Class getModuleConfigClass() + { + return RtmpConfig.class; + } + +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java new file mode 100644 index 000000000..2f108ea48 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java @@ -0,0 +1,264 @@ +package org.sensorhub.impl.sensor.rtmp; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.sensor.SensorException; +import org.sensorhub.impl.sensor.AbstractSensorModule; +import org.sensorhub.impl.sensor.ffmpeg.outputs.AudioOutput; +import org.sensorhub.impl.sensor.ffmpeg.outputs.VideoOutput; +import org.sensorhub.impl.sensor.rtmp.config.HostType; +import org.sensorhub.impl.sensor.rtmp.config.RtmpConfig; +import org.sensorhub.mpegts.MpegTsProcessor; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class RtmpDriver extends AbstractSensorModule { + private static final RtmpUrlArbiter urlArbiter = new RtmpUrlArbiter(); + private ExecutorService executorService; + private ExecutorService videoExecutorService; + private ExecutorService audioExecutorService; + private ScheduledExecutorService heartbeatExecutorService; + + final AtomicReference mpegTsProcessor = new AtomicReference<>(); + final AtomicReference> videoOutput = new AtomicReference<>(); + final AtomicReference> audioOutput = new AtomicReference<>(); + String connectionUrl = ""; + volatile boolean hasConnected = false; + + + @Override + protected void doInit() throws SensorHubException { + super.doInit(); + + urlArbiter.removeConnection(connectionUrl); + + setConnectionUrl(); + + if (!urlArbiter.addConnection(connectionUrl)) { + throw new SensorException("RTMP url already in use: " + connectionUrl); + } + + createMpegTsProcessor(); + } + + private void createMpegTsProcessor() { + String commandLineArgs = "-timeout 0 -listen 1"; + + var mpegts = new MpegTsProcessor(connectionUrl, commandLineArgs); + mpegts.setInjectVideoExtradata(true); + mpegTsProcessor.set(mpegts); + } + + private void setConnectionUrl() throws SensorException { + var connectionConfig = config.connectionConfig; + StringBuilder sb = new StringBuilder("rtmp://"); + + if (connectionConfig.host == HostType.OVERRIDE) { + if (connectionConfig.hostOverride == null || connectionConfig.hostOverride.isBlank()) { + throw new SensorException("Domain override is not set"); + } + sb.append(connectionConfig.hostOverride); + } else { + sb.append(connectionConfig.host.host); + } + + sb.append(":").append(connectionConfig.port); + + if (connectionConfig.path != null && !connectionConfig.path.isBlank()) { + if (!connectionConfig.path.startsWith("/")) { + sb.append("/"); + } + sb.append(connectionConfig.path.trim()); + } + + connectionUrl = sb.toString(); + } + + + @Override + protected void afterStart() throws SensorHubException { + super.afterStart(); + //stopStream(); + hasConnected = false; + + if (mpegTsProcessor.get() == null) { + createMpegTsProcessor(); + } else if (mpegTsProcessor.get().getState() != Thread.State.NEW) { + stopStream(); + createMpegTsProcessor(); + } + + stopExecutors(); + executorService = Executors.newSingleThreadExecutor(); + executorService.submit(this::startStream); + heartbeatExecutorService = Executors.newSingleThreadScheduledExecutor(); + heartbeatExecutorService.scheduleAtFixedRate(this::heartbeat, 5, 5, java.util.concurrent.TimeUnit.SECONDS); + heartbeatExecutorService.submit(this::heartbeat); + } + + private void startStream() { + reportStatus("Listening on: " + connectionUrl); + boolean status; + + var mpegts = mpegTsProcessor.get(); + + if (mpegts == null) { + logger.error("Could not start; stream processor is null"); + return; + } + status = mpegts.openStream(); + + if (!status) { + String error = "Failed to connect to " + connectionUrl; + reportError(error, new SensorException(error)); + return; + } + + synchronized (mpegTsProcessor) { + mpegts = mpegTsProcessor.get(); + + if (mpegts == null) { + logger.error("Could not start; stream processor is null"); + return; + } + + if (mpegts.isStreamOpened()) { + if (mpegts.hasVideoStream()) { + createVideoOutput(mpegts.getVideoStreamFrameDimensions(), mpegts.getVideoCodecName()); + mpegts.setVideoDataBufferListener(videoOutput.get()); + } + + if (mpegts.hasAudioStream()) { + createAudioOutput(mpegts.getAudioSampleRate(), mpegts.getAudioCodecName()); + mpegts.setAudioDataBufferListener(audioOutput.get()); + } + + } + clearStatus(); + reportStatus("RTMP stream for " + connectionUrl + " opened."); + hasConnected = true; + mpegts.processStream(); + /* + executorService.submit(() -> { + MpegTsProcessor processor; + while ((processor = mpegTsProcessor.get()) != null) { + + while (processor.isStreamOpened()) { + processor.processP(); + } + if (!Thread.currentThread().isInterrupted()) { + reportStatus("RTMP stream " + connectionUrl + " lost connection. Reconnecting..."); + processor.openStream(); + } else { + return; + } + } + reportStatus("RTMP stream closed."); + + }); + + */ + //mpegts.processStream(); + } + } + + private void heartbeat() { + var mpegts = mpegTsProcessor.get(); + if (mpegts == null || !hasConnected) { return; } + + if (!mpegts.isStreamOpened()) { + reportStatus("RTMP stream " + connectionUrl + " lost connection. Reconnecting..."); + if (mpegts.openStream()) { + reportStatus("RTMP stream for " + connectionUrl + " opened."); + } + } + } + + protected void createVideoOutput(int[] videoDims, String codecName) { + synchronized (videoOutput) { + var videoOut = new VideoOutput<>(this, videoDims, codecName); + videoOutput.set(videoOut); + + if (videoExecutorService != null) { + videoExecutorService.shutdown(); + } + + videoExecutorService = Executors.newSingleThreadExecutor(); + videoOut.setExecutor(videoExecutorService); + videoOut.doInit(); + addOutput(videoOut, false); + } + } + + protected void createAudioOutput(int sampleRate, String codecName) { + synchronized (audioOutput) { + var audioOut = new AudioOutput<>(this, sampleRate, codecName); + audioOutput.set(audioOut); + + if (audioExecutorService != null) { + audioExecutorService.shutdown(); + } + audioExecutorService = Executors.newSingleThreadExecutor(); + audioOut.setExecutor(audioExecutorService); + audioOut.doInit(); + addOutput(audioOut, false); + } + } + + + @Override + protected void doStop() throws SensorHubException { + super.doStop(); + stopStream(); + stopExecutors(); + urlArbiter.removeConnection(connectionUrl); + } + + private void stopStream() { + synchronized (mpegTsProcessor) { + var mpegts = mpegTsProcessor.get(); + if (mpegts != null) { + mpegts.stopProcessingStream(); + try { + logger.info("Waiting for stream to stop."); + mpegts.join(10000); + } catch (InterruptedException e) { + logger.error("Interrupted while waiting for stream to stop.", e); + } + mpegts.closeStream(); + } + mpegTsProcessor.set(null); + } + } + + private void stopExecutors() { + if (executorService != null) { + executorService.shutdownNow(); + } + if (videoExecutorService != null) { + videoExecutorService.shutdownNow(); + } + if (audioExecutorService != null) { + audioExecutorService.shutdownNow(); + } + if (heartbeatExecutorService != null) { + heartbeatExecutorService.shutdownNow(); + } + } + + + @Override + public void cleanup() throws SensorHubException { + super.cleanup(); + stopStream(); + stopExecutors(); + urlArbiter.removeConnection(connectionUrl); + } + + @Override + public boolean isConnected() { + return isStarted() && mpegTsProcessor.get() != null; + } +} \ No newline at end of file diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java new file mode 100644 index 000000000..744ea9133 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java @@ -0,0 +1,16 @@ +package org.sensorhub.impl.sensor.rtmp; + +import java.util.HashSet; +import java.util.Set; + +public class RtmpUrlArbiter { + private final Set urls = new HashSet<>(); + + public synchronized boolean addConnection(String url) { + return urls.add(url); + } + + public synchronized void removeConnection(String url) { + urls.remove(url); + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java new file mode 100644 index 000000000..46675376a --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java @@ -0,0 +1,24 @@ +package org.sensorhub.impl.sensor.rtmp.config; + +import org.sensorhub.api.config.DisplayInfo; + +public class ConnectionConfig { + + @DisplayInfo.Required + @DisplayInfo(label = "Host", desc = "Domain listening for an RTMP connection request. Unspecified should work " + + "for most cases.") + public HostType host = HostType.UNSPECIFIED; + + @DisplayInfo(label = "Host Override", desc = "(Optional) Override the host listening for an RTMP connection request." + + "\nHost must be set to OVERRIDE.") + public String hostOverride = ""; + + @DisplayInfo.Required + @DisplayInfo(label = "Port", desc = "Port listening for an RTMP connection request.") + @DisplayInfo.ValueRange(min = 1, max = 65535) + public int port = 1935; + + @DisplayInfo(label = "Path", desc = "(Optional) Path to listen for an RTMP connection request. I.e. everything in the URL " + + "after the port.") + public String path = ""; +} \ No newline at end of file diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java new file mode 100644 index 000000000..c5cca64df --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java @@ -0,0 +1,14 @@ +package org.sensorhub.impl.sensor.rtmp.config; + +public enum HostType { + UNSPECIFIED("0.0.0.0"), + LOCALHOST("localhost"), + DOCKER_INTERNAL("host.docker.internal"), + OVERRIDE(""); + + public final String host; + + HostType(String host) { + this.host = host; + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java new file mode 100644 index 000000000..ffc45e138 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java @@ -0,0 +1,31 @@ +package org.sensorhub.impl.sensor.rtmp.config; + +import org.sensorhub.api.config.DisplayInfo; +import org.sensorhub.api.sensor.PositionConfig; +import org.sensorhub.api.sensor.SensorConfig; + +public class RtmpConfig extends SensorConfig { + @DisplayInfo.Required + @DisplayInfo(label = "Video Stream ID", desc = "Serial number or unique identifier for video stream.") + public String serialNumber = "video001"; + + @DisplayInfo.Required + @DisplayInfo(label = "Connection", desc = "Configuration options for source of RTMP.") + public ConnectionConfig connectionConfig = new ConnectionConfig(); + + /** + * Configuration options for the location and orientation of the sensor. + */ + @DisplayInfo(label = "Position", desc = "Location and orientation of the sensor.") + public PositionConfig positionConfig = new PositionConfig(); + + @Override + public PositionConfig.LLALocation getLocation() { + return positionConfig.location; + } + + @Override + public PositionConfig.EulerOrientation getOrientation() { + return positionConfig.orientation; + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider b/sensors/video/sensorhub-driver-rtmp/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider new file mode 100644 index 000000000..e9ee1b390 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider @@ -0,0 +1 @@ +org.sensorhub.impl.sensor.rtmp.RtmpDescriptor \ No newline at end of file From ecd3b3ba00910b45304fd0b7555be793c8356760 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Wed, 29 Apr 2026 20:12:29 -0500 Subject: [PATCH 02/13] Improve URL deconfliction --- .../org/sensorhub/impl/sensor/rtmp/RtmpDriver.java | 11 +++++++++-- .../sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java | 14 +++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java index 2f108ea48..e9d1efb9e 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java @@ -8,6 +8,7 @@ import org.sensorhub.impl.sensor.rtmp.config.RtmpConfig; import org.sensorhub.mpegts.MpegTsProcessor; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -32,12 +33,18 @@ public class RtmpDriver extends AbstractSensorModule { protected void doInit() throws SensorHubException { super.doInit(); + if (getUniqueIdentifier() == null) { + generateUniqueID("urn:osh:sensor:rtmp:", config.serialNumber); + generateXmlID("RTMP_", config.serialNumber); + } + urlArbiter.removeConnection(connectionUrl); setConnectionUrl(); - if (!urlArbiter.addConnection(connectionUrl)) { - throw new SensorException("RTMP url already in use: " + connectionUrl); + String moduleUid; + if ((moduleUid = urlArbiter.addConnection(connectionUrl, this.getUniqueIdentifier())) != null) { + throw new SensorException("RTMP url already in use by module: " + moduleUid); } createMpegTsProcessor(); diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java index 744ea9133..e9e77d3bf 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java @@ -1,13 +1,21 @@ package org.sensorhub.impl.sensor.rtmp; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; public class RtmpUrlArbiter { - private final Set urls = new HashSet<>(); + private final Map urls = new HashMap<>(); - public synchronized boolean addConnection(String url) { - return urls.add(url); + // If successful, returns null, otherwise returns the moduleUid of the existing connection + public synchronized String addConnection(String url, String moduleUid) { + if (urls.containsKey(url)) { + return urls.get(url); + } else { + urls.put(url, moduleUid); + return null; + } } public synchronized void removeConnection(String url) { From 399ee301fe1c5d6b423c4956d30c355391e9a0c0 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Tue, 5 May 2026 13:24:03 -0500 Subject: [PATCH 03/13] Add avformatcontext callback for open stream termination --- .../org/sensorhub/mpegts/MpegTsProcessor.java | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/sensors/video/sensorhub-driver-ffmpeg/src/main/java/org/sensorhub/mpegts/MpegTsProcessor.java b/sensors/video/sensorhub-driver-ffmpeg/src/main/java/org/sensorhub/mpegts/MpegTsProcessor.java index c3da173af..2de567d7a 100644 --- a/sensors/video/sensorhub-driver-ffmpeg/src/main/java/org/sensorhub/mpegts/MpegTsProcessor.java +++ b/sensors/video/sensorhub-driver-ffmpeg/src/main/java/org/sensorhub/mpegts/MpegTsProcessor.java @@ -14,12 +14,14 @@ import org.bytedeco.ffmpeg.avcodec.AVCodecParameters; import org.bytedeco.ffmpeg.avcodec.AVPacket; import org.bytedeco.ffmpeg.avformat.AVFormatContext; +import org.bytedeco.ffmpeg.avformat.AVIOInterruptCB; import org.bytedeco.ffmpeg.avformat.AVInputFormat; import org.bytedeco.ffmpeg.avutil.AVDictionary; import org.bytedeco.ffmpeg.avutil.AVRational; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avformat; import org.bytedeco.ffmpeg.global.avutil; +import org.bytedeco.javacpp.Pointer; import org.bytedeco.javacpp.PointerPointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.bytedeco.ffmpeg.global.avdevice.avdevice_register_all; -import static org.bytedeco.ffmpeg.global.avformat.av_find_input_format; -import static org.bytedeco.ffmpeg.global.avformat.av_read_frame; +import static org.bytedeco.ffmpeg.global.avformat.*; /** * The class provides a wrapper to bytedeco.org JavaCpp-Platform. @@ -98,6 +99,13 @@ public class MpegTsProcessor extends Thread { */ private AVFormatContext avFormatContext; + /** + * Callbacks allow some blocking AVFormatContext operations to terminate early on + * stream close. These are global for GC purposes; do not directly modify. + */ + private AVIOInterruptCB.Callback_Pointer callbackPointer; + private AVIOInterruptCB interruptCallback; + /** * Flag indicating if processing of the transport stream should be terminated. */ @@ -186,7 +194,17 @@ public boolean openStream() { avformat.avformat_network_init(); // Create a new AV Format Context for I/O - avFormatContext = new AVFormatContext(null); + avFormatContext = avformat_alloc_context(); + + // Allow for context to be interrupted after attempt to stop stream + interruptCallback = avFormatContext.interrupt_callback(); + callbackPointer = new AVIOInterruptCB.Callback_Pointer() { + @Override + public int call(Pointer opaque) { + return terminateProcessing.get() ? 1 : 0; + } + }; + interruptCallback.callback(callbackPointer); AVInputFormat inputFormat = null; @@ -526,11 +544,8 @@ public void closeStream() { */ public void stopProcessingStream() { logger.debug("stopProcessingStream"); - - if (streamOpened) { - loop = false; - terminateProcessing.set(true); - } + loop = false; + terminateProcessing.set(true); } /** From 786790caf381bff0af6c63162813400649bf4920 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Tue, 5 May 2026 13:26:35 -0500 Subject: [PATCH 04/13] Only deconflict ports, not URLs --- .../rtmp/{RtmpUrlArbiter.java => RtmpPortArbiter.java} | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) rename sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/{RtmpUrlArbiter.java => RtmpPortArbiter.java} (58%) diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortArbiter.java similarity index 58% rename from sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java rename to sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortArbiter.java index e9e77d3bf..ff9433ddc 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpUrlArbiter.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortArbiter.java @@ -1,15 +1,13 @@ package org.sensorhub.impl.sensor.rtmp; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; -public class RtmpUrlArbiter { - private final Map urls = new HashMap<>(); +public class RtmpPortArbiter { + private final Map urls = new HashMap<>(); // If successful, returns null, otherwise returns the moduleUid of the existing connection - public synchronized String addConnection(String url, String moduleUid) { + public synchronized String addConnection(int url, String moduleUid) { if (urls.containsKey(url)) { return urls.get(url); } else { @@ -18,7 +16,7 @@ public synchronized String addConnection(String url, String moduleUid) { } } - public synchronized void removeConnection(String url) { + public synchronized void removeConnection(int url) { urls.remove(url); } } From 5ca2b07796a3c95f8ff322c00065c1ada6e759c4 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Tue, 5 May 2026 13:28:42 -0500 Subject: [PATCH 05/13] Remove stream path validation and stream key generation (commented out, may be implemented later) --- .../impl/sensor/rtmp/RtmpDriver.java | 186 ++++++++++++++---- .../sensor/rtmp/config/ConnectionConfig.java | 11 ++ 2 files changed, 157 insertions(+), 40 deletions(-) diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java index e9d1efb9e..9add5d993 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java @@ -1,5 +1,12 @@ package org.sensorhub.impl.sensor.rtmp; +import org.bytedeco.ffmpeg.avutil.Callback_Pointer_int_BytePointer_Pointer; +import org.bytedeco.ffmpeg.avutil.Callback_Pointer_int_String_Pointer; +import org.bytedeco.ffmpeg.global.avcodec; +import org.bytedeco.ffmpeg.global.avformat; +import org.bytedeco.javacpp.BytePointer; +import org.bytedeco.javacpp.Pointer; import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.module.ModuleEvent; import org.sensorhub.api.sensor.SensorException; import org.sensorhub.impl.sensor.AbstractSensorModule; import org.sensorhub.impl.sensor.ffmpeg.outputs.AudioOutput; @@ -7,16 +14,25 @@ import org.sensorhub.impl.sensor.rtmp.config.HostType; import org.sensorhub.impl.sensor.rtmp.config.RtmpConfig; import org.sensorhub.mpegts.MpegTsProcessor; +import org.sensorhub.utils.Async; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; +import java.security.SecureRandom; +import java.util.HexFormat; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import static org.bytedeco.ffmpeg.global.avutil.AV_LOG_WARNING; +import static org.bytedeco.ffmpeg.global.avutil.av_log_set_callback; + public class RtmpDriver extends AbstractSensorModule { - private static final RtmpUrlArbiter urlArbiter = new RtmpUrlArbiter(); + private static final String COMMAND_LINE_ARGS = "-timeout 0 -listen 1 -username test -password test"; + private static final int EXECUTOR_JOIN_TIMEOUT = 10; + private static final TimeUnit EXECUTOR_JOIN_TIME_UNIT = TimeUnit.SECONDS; + private static final int HEARTBEAT_INTERVAL = 5; + private static final TimeUnit HEARTBEAT_TIME_UNIT = TimeUnit.SECONDS; + private static final int MAX_STARTUP_WAIT_TIME_MS = 5000; + + private static final RtmpPortArbiter portArbiter = new RtmpPortArbiter(); private ExecutorService executorService; private ExecutorService videoExecutorService; private ExecutorService audioExecutorService; @@ -25,7 +41,10 @@ public class RtmpDriver extends AbstractSensorModule { final AtomicReference mpegTsProcessor = new AtomicReference<>(); final AtomicReference> videoOutput = new AtomicReference<>(); final AtomicReference> audioOutput = new AtomicReference<>(); + + int connectionPort = -1; String connectionUrl = ""; + //String path = ""; volatile boolean hasConnected = false; @@ -38,30 +57,31 @@ protected void doInit() throws SensorHubException { generateXmlID("RTMP_", config.serialNumber); } - urlArbiter.removeConnection(connectionUrl); + portArbiter.removeConnection(connectionPort); setConnectionUrl(); - String moduleUid; - if ((moduleUid = urlArbiter.addConnection(connectionUrl, this.getUniqueIdentifier())) != null) { - throw new SensorException("RTMP url already in use by module: " + moduleUid); - } + //createMpegTsProcessor(); + } - createMpegTsProcessor(); + private static String generateStreamKey() { + byte[] bytes = new byte[16]; + new SecureRandom().nextBytes(bytes); + return HexFormat.of().formatHex(bytes); } private void createMpegTsProcessor() { - String commandLineArgs = "-timeout 0 -listen 1"; - - var mpegts = new MpegTsProcessor(connectionUrl, commandLineArgs); + var mpegts = new MpegTsProcessor(connectionUrl, COMMAND_LINE_ARGS/* + " -rtmp_app /live -rtmp_playpath " + path*/); mpegts.setInjectVideoExtradata(true); mpegTsProcessor.set(mpegts); } private void setConnectionUrl() throws SensorException { + var connectionConfig = config.connectionConfig; StringBuilder sb = new StringBuilder("rtmp://"); + if (connectionConfig.host == HostType.OVERRIDE) { if (connectionConfig.hostOverride == null || connectionConfig.hostOverride.isBlank()) { throw new SensorException("Domain override is not set"); @@ -73,14 +93,36 @@ private void setConnectionUrl() throws SensorException { sb.append(":").append(connectionConfig.port); + /* + if (connectionConfig.generateRandomStreamKey) { + connectionConfig.generateRandomStreamKey = false; + String streamKey = generateStreamKey(); + if (!connectionConfig.path.isBlank() && !connectionConfig.path.endsWith("/")) { + connectionConfig.path += "/"; + } + connectionConfig.path += streamKey; + } + if (connectionConfig.path != null && !connectionConfig.path.isBlank()) { if (!connectionConfig.path.startsWith("/")) { - sb.append("/"); + connectionConfig.path = "/" + connectionConfig.path; } - sb.append(connectionConfig.path.trim()); + sb.append(connectionConfig.path); } + path = connectionConfig.path; + + */ connectionUrl = sb.toString(); + connectionPort = connectionConfig.port; + } + + @Override + protected void doStart() throws SensorHubException { + String moduleUid; + if ((moduleUid = portArbiter.addConnection(connectionPort, this.getUniqueIdentifier())) != null) { + throw new SensorException("Port "+ connectionPort + " already in use by module: " + moduleUid); + } } @@ -90,23 +132,52 @@ protected void afterStart() throws SensorHubException { //stopStream(); hasConnected = false; - if (mpegTsProcessor.get() == null) { - createMpegTsProcessor(); - } else if (mpegTsProcessor.get().getState() != Thread.State.NEW) { - stopStream(); + synchronized (mpegTsProcessor) { + var mpegts = mpegTsProcessor.get(); + if (mpegts != null && mpegts.getState() != Thread.State.NEW) { + stopStream(); + } createMpegTsProcessor(); } - stopExecutors(); + try { + stopExecutors(); + } catch (InterruptedException e) { + throw new SensorHubException("Interrupted while stopping executors", e); + } + + reportStatus("Listening on: " + connectionUrl); executorService = Executors.newSingleThreadExecutor(); executorService.submit(this::startStream); heartbeatExecutorService = Executors.newSingleThreadScheduledExecutor(); - heartbeatExecutorService.scheduleAtFixedRate(this::heartbeat, 5, 5, java.util.concurrent.TimeUnit.SECONDS); + heartbeatExecutorService.scheduleAtFixedRate(this::heartbeat, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, HEARTBEAT_TIME_UNIT); heartbeatExecutorService.submit(this::heartbeat); } + private boolean isStopping () { + return getCurrentState() == ModuleEvent.ModuleState.STOPPING || getCurrentState() == ModuleEvent.ModuleState.STOPPED; + } + + /* + private boolean isMatchingPath(String url) { + boolean isMatching = url != null && url.trim().contains(path); + if (!isMatching) { + logger.warn("Received stream on: {} but expected: {}", url, config.connectionConfig.path); + } + return isMatching; + } + + */ + private void startStream() { - reportStatus("Listening on: " + connectionUrl); + // Need to wait for STARTED state so that outputs can be added + try { + Async.waitForCondition(() -> getCurrentState() == ModuleEvent.ModuleState.STARTED, MAX_STARTUP_WAIT_TIME_MS); + } catch (TimeoutException e) { + reportError("Failed to start stream; timed out waiting for startup", e); + return; + } + boolean status; var mpegts = mpegTsProcessor.get(); @@ -115,8 +186,28 @@ private void startStream() { logger.error("Could not start; stream processor is null"); return; } + + /* + // Reject connections that don't match the configured path + // Thread will sit here until a matching connection is made + do { + if (Thread.currentThread().isInterrupted() || isStopping()) { + return; + } + mpegts.closeStream(); + status = mpegts.openStream(); + String path = mpegts.getPrivDataString("rtmp_app") + "/" + mpegts.getPrivDataString("rtmp_playpath"); + } while (!isMatchingPath(path)); + + */ + + mpegts.closeStream(); status = mpegts.openStream(); + if (isStopping()) { + return; + } + if (!status) { String error = "Failed to connect to " + connectionUrl; reportError(error, new SensorException(error)); @@ -127,7 +218,7 @@ private void startStream() { mpegts = mpegTsProcessor.get(); if (mpegts == null) { - logger.error("Could not start; stream processor is null"); + reportError("Stream could not be opened", new SensorException("MpegTs processor is null")); return; } @@ -142,6 +233,9 @@ private void startStream() { mpegts.setAudioDataBufferListener(audioOutput.get()); } + } else { + reportError("Stream could not be opened", new SensorException("RTMP stream connected but not opened")); + return; } clearStatus(); reportStatus("RTMP stream for " + connectionUrl + " opened."); @@ -177,9 +271,8 @@ private void heartbeat() { if (!mpegts.isStreamOpened()) { reportStatus("RTMP stream " + connectionUrl + " lost connection. Reconnecting..."); - if (mpegts.openStream()) { - reportStatus("RTMP stream for " + connectionUrl + " opened."); - } + createMpegTsProcessor(); + startStream(); } } @@ -218,9 +311,18 @@ protected void createAudioOutput(int sampleRate, String codecName) { @Override protected void doStop() throws SensorHubException { super.doStop(); + shutdown(); + } + + private void shutdown() throws SensorHubException { + portArbiter.removeConnection(config.connectionConfig.port); stopStream(); - stopExecutors(); - urlArbiter.removeConnection(connectionUrl); + try { + stopExecutors(); + } catch (InterruptedException e) { + throw new SensorHubException("Interrupted while stopping executors", e); + } + } private void stopStream() { @@ -228,11 +330,13 @@ private void stopStream() { var mpegts = mpegTsProcessor.get(); if (mpegts != null) { mpegts.stopProcessingStream(); - try { - logger.info("Waiting for stream to stop."); - mpegts.join(10000); - } catch (InterruptedException e) { - logger.error("Interrupted while waiting for stream to stop.", e); + if (mpegts.isAlive()) { + try { + logger.info("Waiting for stream to stop."); + mpegts.join(); + } catch (InterruptedException e) { + logger.error("Interrupted while waiting for stream to stop.", e); + } } mpegts.closeStream(); } @@ -240,18 +344,22 @@ private void stopStream() { } } - private void stopExecutors() { + private void stopExecutors() throws InterruptedException { if (executorService != null) { executorService.shutdownNow(); + executorService.awaitTermination(EXECUTOR_JOIN_TIMEOUT, EXECUTOR_JOIN_TIME_UNIT); } if (videoExecutorService != null) { videoExecutorService.shutdownNow(); + videoExecutorService.awaitTermination(EXECUTOR_JOIN_TIMEOUT, EXECUTOR_JOIN_TIME_UNIT); } if (audioExecutorService != null) { audioExecutorService.shutdownNow(); + audioExecutorService.awaitTermination(EXECUTOR_JOIN_TIMEOUT, EXECUTOR_JOIN_TIME_UNIT); } if (heartbeatExecutorService != null) { heartbeatExecutorService.shutdownNow(); + heartbeatExecutorService.awaitTermination(EXECUTOR_JOIN_TIMEOUT, EXECUTOR_JOIN_TIME_UNIT); } } @@ -259,13 +367,11 @@ private void stopExecutors() { @Override public void cleanup() throws SensorHubException { super.cleanup(); - stopStream(); - stopExecutors(); - urlArbiter.removeConnection(connectionUrl); + shutdown(); } @Override public boolean isConnected() { - return isStarted() && mpegTsProcessor.get() != null; + return isStarted() && mpegTsProcessor.get() != null && mpegTsProcessor.get().isStreamOpened(); } } \ No newline at end of file diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java index 46675376a..6afea4569 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java @@ -4,6 +4,14 @@ public class ConnectionConfig { + /* + @DisplayInfo.Required + @DisplayInfo(label = "Generate Random Stream Key", desc = "Enable to generate and append a random hex string to the path. " + + "Recommended for security. Only enable on first init, otherwise path will include multiple keys. ") + public boolean generateRandomStreamKey = true; + + */ + @DisplayInfo.Required @DisplayInfo(label = "Host", desc = "Domain listening for an RTMP connection request. Unspecified should work " + "for most cases.") @@ -18,7 +26,10 @@ public class ConnectionConfig { @DisplayInfo.ValueRange(min = 1, max = 65535) public int port = 1935; + /* @DisplayInfo(label = "Path", desc = "(Optional) Path to listen for an RTMP connection request. I.e. everything in the URL " + "after the port.") public String path = ""; + + */ } \ No newline at end of file From 9edcb76e11d790ffb9cd174d58c340482cf3c09c Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Tue, 5 May 2026 13:32:22 -0500 Subject: [PATCH 06/13] Remove host override --- .../java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java | 5 +++++ .../sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java | 6 +----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java index 9add5d993..6e318b1be 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java @@ -82,6 +82,7 @@ private void setConnectionUrl() throws SensorException { StringBuilder sb = new StringBuilder("rtmp://"); + /* if (connectionConfig.host == HostType.OVERRIDE) { if (connectionConfig.hostOverride == null || connectionConfig.hostOverride.isBlank()) { throw new SensorException("Domain override is not set"); @@ -91,6 +92,10 @@ private void setConnectionUrl() throws SensorException { sb.append(connectionConfig.host.host); } + */ + + sb.append(connectionConfig.host.host); + sb.append(":").append(connectionConfig.port); /* diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java index 6afea4569..5bead62ef 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java @@ -17,10 +17,6 @@ public class ConnectionConfig { "for most cases.") public HostType host = HostType.UNSPECIFIED; - @DisplayInfo(label = "Host Override", desc = "(Optional) Override the host listening for an RTMP connection request." - + "\nHost must be set to OVERRIDE.") - public String hostOverride = ""; - @DisplayInfo.Required @DisplayInfo(label = "Port", desc = "Port listening for an RTMP connection request.") @DisplayInfo.ValueRange(min = 1, max = 65535) @@ -30,6 +26,6 @@ public class ConnectionConfig { @DisplayInfo(label = "Path", desc = "(Optional) Path to listen for an RTMP connection request. I.e. everything in the URL " + "after the port.") public String path = ""; - + */ } \ No newline at end of file From ae31dbf037a0d9d1f5c8b2be423769729a4fc3a4 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Tue, 5 May 2026 13:32:22 -0500 Subject: [PATCH 07/13] Remove host override --- .../java/org/sensorhub/impl/sensor/rtmp/config/HostType.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java index c5cca64df..6a8f26c6f 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java @@ -3,8 +3,8 @@ public enum HostType { UNSPECIFIED("0.0.0.0"), LOCALHOST("localhost"), - DOCKER_INTERNAL("host.docker.internal"), - OVERRIDE(""); + DOCKER_INTERNAL("host.docker.internal")/*, + OVERRIDE("")*/; public final String host; From abeba0094243f89628ecc1dd23375ae81a0d801c Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Tue, 5 May 2026 14:03:48 -0500 Subject: [PATCH 08/13] Update README --- sensors/video/sensorhub-driver-rtmp/README.md | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/sensors/video/sensorhub-driver-rtmp/README.md b/sensors/video/sensorhub-driver-rtmp/README.md index 3f5285797..1e587b7e0 100644 --- a/sensors/video/sensorhub-driver-rtmp/README.md +++ b/sensors/video/sensorhub-driver-rtmp/README.md @@ -1,7 +1,38 @@ -### Axis PTZ (Pan-Tilt-Zoom) Video Camera +# FFmpeg RTMP Driver -OSH sensor adaptor supporting output (video and PTZ settings) and tasking (camera and PTZ) for Axis cameras. +OSH sensor driver using FFmpeg to listen to RTMP streams. This driver depends on the following modules at runtime: - * sensorhub-driver-rtpcam - * sensorhub-driver-videocam +* sensorhub-driver-ffmpeg + +## Config + +### Connection + +* **Host** + - RTMP server host + - **UNSPECIFIED**: listen for remote connections. + - **LOCALHOST**: listen for local connections. + - **DOCKER_INTERNAL**: listen for connections inside a docker container. +* **Port** + - RTMP server port + - 1935 is the default port for RTMP. + - Any port may be used as long as it is not in use by another listener (or any other application). + - RTMP sensor driver modules track ports in use. To release ownership of a port, STOP the module. + +## Usage + +### Initialization + +

To initialize the driver, provide a unique serial number in the configuration. +In connection, set the host and provide an unused port. Once initialized, other RTMP sensor driver modules cannot +use the same port. +

Stop the module to free the port.

+ +### Listening + +

Once initialized, start the module to begin listening for RTMP connections. After the driver +is started, begin publishing the RTMP stream. +

Note: The module does NOT attempt to validate the +username, password, RTMP app, or RTMP playpath. Only the host and port need to match.

+ From b35d27532e94541cf614e801d4c40c49370fec2b Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Wed, 6 May 2026 13:08:08 -0500 Subject: [PATCH 09/13] Use singleton pattern to track ports, add javadoc and license --- .../sensorhub/impl/sensor/rtmp/Activator.java | 12 ++ .../impl/sensor/rtmp/RtmpDescriptor.java | 21 ++- .../impl/sensor/rtmp/RtmpDriver.java | 142 ++++++++++++++++-- .../impl/sensor/rtmp/RtmpPortArbiter.java | 22 --- .../impl/sensor/rtmp/RtmpPortSingleton.java | 65 ++++++++ .../sensor/rtmp/config/ConnectionConfig.java | 12 ++ .../impl/sensor/rtmp/config/HostType.java | 12 ++ .../impl/sensor/rtmp/config/RtmpConfig.java | 12 ++ 8 files changed, 248 insertions(+), 50 deletions(-) delete mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortArbiter.java create mode 100644 sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java index 8efb94e8b..8f3900c58 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java @@ -1,3 +1,15 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + package org.sensorhub.impl.sensor.rtmp; import org.osgi.framework.BundleActivator; diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java index 73724e3c4..85396aed1 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java @@ -1,17 +1,14 @@ /***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. -The contents of this file are subject to the Mozilla Public License, v. 2.0. -If a copy of the MPL was not distributed with this file, You can obtain one -at http://mozilla.org/MPL/2.0/. - -Software distributed under the License is distributed on an "AS IS" basis, -WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License -for the specific language governing rights and limitations under the License. - -The Initial Developer is Botts Innovative Research Inc.. Portions created by the Initial -Developer are Copyright (C) 2014 the Initial Developer. All Rights Reserved. - -******************************* END LICENSE BLOCK ***************************/ + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ package org.sensorhub.impl.sensor.rtmp; diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java index 6e318b1be..1c796d675 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java @@ -1,17 +1,23 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + package org.sensorhub.impl.sensor.rtmp; -import org.bytedeco.ffmpeg.avutil.Callback_Pointer_int_BytePointer_Pointer; -import org.bytedeco.ffmpeg.avutil.Callback_Pointer_int_String_Pointer; -import org.bytedeco.ffmpeg.global.avcodec; -import org.bytedeco.ffmpeg.global.avformat; -import org.bytedeco.javacpp.BytePointer; -import org.bytedeco.javacpp.Pointer; + import org.sensorhub.api.common.SensorHubException; import org.sensorhub.api.module.ModuleEvent; import org.sensorhub.api.sensor.SensorException; import org.sensorhub.impl.sensor.AbstractSensorModule; import org.sensorhub.impl.sensor.ffmpeg.outputs.AudioOutput; import org.sensorhub.impl.sensor.ffmpeg.outputs.VideoOutput; -import org.sensorhub.impl.sensor.rtmp.config.HostType; import org.sensorhub.impl.sensor.rtmp.config.RtmpConfig; import org.sensorhub.mpegts.MpegTsProcessor; import org.sensorhub.utils.Async; @@ -21,9 +27,21 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; -import static org.bytedeco.ffmpeg.global.avutil.AV_LOG_WARNING; import static org.bytedeco.ffmpeg.global.avutil.av_log_set_callback; +/** + * OpenSensorHub sensor module that listens for and processes RTMP streams. + *

+ * The driver creates an FFmpeg-backed MPEG-TS processor configured as an RTMP + * listener. Once a publisher connects, the driver detects available audio and + * video streams, creates matching OpenSensorHub outputs, and forwards stream + * data through those outputs. + *

+ *

+ * Only one RTMP driver instance may use a given port at a time. Port ownership + * is tracked through a shared {@link RtmpPortSingleton}. + *

+ */ public class RtmpDriver extends AbstractSensorModule { private static final String COMMAND_LINE_ARGS = "-timeout 0 -listen 1 -username test -password test"; private static final int EXECUTOR_JOIN_TIMEOUT = 10; @@ -32,7 +50,7 @@ public class RtmpDriver extends AbstractSensorModule { private static final TimeUnit HEARTBEAT_TIME_UNIT = TimeUnit.SECONDS; private static final int MAX_STARTUP_WAIT_TIME_MS = 5000; - private static final RtmpPortArbiter portArbiter = new RtmpPortArbiter(); + private final RtmpPortSingleton portSingleton = RtmpPortSingleton.getInstance(); private ExecutorService executorService; private ExecutorService videoExecutorService; private ExecutorService audioExecutorService; @@ -47,7 +65,17 @@ public class RtmpDriver extends AbstractSensorModule { //String path = ""; volatile boolean hasConnected = false; - + /** + * Initializes the driver configuration and generated identifiers. + *

+ * If no unique identifier has been assigned, this method generates both the + * OpenSensorHub unique identifier and XML identifier from the configured + * serial number. It also releases any previously tracked port and rebuilds + * the RTMP listener URL from the current configuration. + *

+ * + * @throws SensorHubException if initialization fails + */ @Override protected void doInit() throws SensorHubException { super.doInit(); @@ -57,7 +85,7 @@ protected void doInit() throws SensorHubException { generateXmlID("RTMP_", config.serialNumber); } - portArbiter.removeConnection(connectionPort); + portSingleton.removeConnection(connectionPort); setConnectionUrl(); @@ -70,12 +98,21 @@ private static String generateStreamKey() { return HexFormat.of().formatHex(bytes); } + /** + * Creates and stores the FFmpeg-backed MPEG-TS processor for the configured + * RTMP listener URL. + */ private void createMpegTsProcessor() { var mpegts = new MpegTsProcessor(connectionUrl, COMMAND_LINE_ARGS/* + " -rtmp_app /live -rtmp_playpath " + path*/); mpegts.setInjectVideoExtradata(true); mpegTsProcessor.set(mpegts); } + /** + * Builds the RTMP listener URL from the configured host and port. + * + * @throws SensorException if the connection configuration is invalid + */ private void setConnectionUrl() throws SensorException { var connectionConfig = config.connectionConfig; @@ -122,15 +159,30 @@ private void setConnectionUrl() throws SensorException { connectionPort = connectionConfig.port; } + /** + * Starts the driver by reserving the configured RTMP port. + * + * @throws SensorHubException if the configured port is already in use by + * another RTMP driver module + */ @Override protected void doStart() throws SensorHubException { String moduleUid; - if ((moduleUid = portArbiter.addConnection(connectionPort, this.getUniqueIdentifier())) != null) { + if ((moduleUid = portSingleton.addConnection(connectionPort, this.getUniqueIdentifier())) != null) { throw new SensorException("Port "+ connectionPort + " already in use by module: " + moduleUid); } } - + /** + * Performs post-start setup for RTMP listening and heartbeat monitoring. + *

+ * This method creates a fresh MPEG-TS processor, stops any previous executor + * services, reports the listening URL, starts the stream listener thread, and + * schedules periodic heartbeat checks. + *

+ * + * @throws SensorHubException if executor shutdown is interrupted + */ @Override protected void afterStart() throws SensorHubException { super.afterStart(); @@ -159,6 +211,11 @@ protected void afterStart() throws SensorHubException { heartbeatExecutorService.submit(this::heartbeat); } + /** + * Determines whether the module is currently stopping or stopped. + * + * @return {@code true} if the module is stopping or stopped; otherwise {@code false} + */ private boolean isStopping () { return getCurrentState() == ModuleEvent.ModuleState.STOPPING || getCurrentState() == ModuleEvent.ModuleState.STOPPED; } @@ -174,6 +231,15 @@ private boolean isMatchingPath(String url) { */ + /** + * Opens the RTMP listener, waits for an incoming stream, creates audio/video + * outputs for detected streams, and begins processing stream data. + *

+ * The method waits until the module reaches the {@code STARTED} state before + * adding outputs. If the stream cannot be opened, an error is reported and + * processing is not started. + *

+ */ private void startStream() { // Need to wait for STARTED state so that outputs can be added try { @@ -270,6 +336,10 @@ private void startStream() { } } + /** + * Checks the active stream and attempts to reconnect if a previously + * connected stream has been lost. + */ private void heartbeat() { var mpegts = mpegTsProcessor.get(); if (mpegts == null || !hasConnected) { return; } @@ -281,6 +351,12 @@ private void heartbeat() { } } + /** + * Creates and registers the video output for the detected RTMP video stream. + * + * @param videoDims video frame dimensions, usually width and height + * @param codecName name of the detected video codec + */ protected void createVideoOutput(int[] videoDims, String codecName) { synchronized (videoOutput) { var videoOut = new VideoOutput<>(this, videoDims, codecName); @@ -297,6 +373,12 @@ protected void createVideoOutput(int[] videoDims, String codecName) { } } + /** + * Creates and registers the audio output for the detected RTMP audio stream. + * + * @param sampleRate detected audio sample rate in hertz + * @param codecName name of the detected audio codec + */ protected void createAudioOutput(int sampleRate, String codecName) { synchronized (audioOutput) { var audioOut = new AudioOutput<>(this, sampleRate, codecName); @@ -312,15 +394,24 @@ protected void createAudioOutput(int sampleRate, String codecName) { } } - + /** + * Stops the driver and releases all RTMP stream resources. + * + * @throws SensorHubException if shutdown fails + */ @Override protected void doStop() throws SensorHubException { super.doStop(); shutdown(); } + /** + * Releases the reserved port, stops the stream, and terminates executor services. + * + * @throws SensorHubException if executor shutdown is interrupted + */ private void shutdown() throws SensorHubException { - portArbiter.removeConnection(config.connectionConfig.port); + portSingleton.removeConnection(config.connectionConfig.port); stopStream(); try { stopExecutors(); @@ -330,6 +421,10 @@ private void shutdown() throws SensorHubException { } + /** + * Stops stream processing, waits for the MPEG-TS processor thread to finish, + * closes the stream, and clears the processor reference. + */ private void stopStream() { synchronized (mpegTsProcessor) { var mpegts = mpegTsProcessor.get(); @@ -349,6 +444,11 @@ private void stopStream() { } } + /** + * Stops all executor services used by the driver and waits for termination. + * + * @throws InterruptedException if interrupted while waiting for executor termination + */ private void stopExecutors() throws InterruptedException { if (executorService != null) { executorService.shutdownNow(); @@ -368,13 +468,23 @@ private void stopExecutors() throws InterruptedException { } } - + /** + * Cleans up module resources before disposal. + * + * @throws SensorHubException if cleanup or shutdown fails + */ @Override public void cleanup() throws SensorHubException { super.cleanup(); shutdown(); } + /** + * Indicates whether the driver is currently started and has an open RTMP stream. + * + * @return {@code true} if the module is started and the RTMP stream is open; + * otherwise {@code false} + */ @Override public boolean isConnected() { return isStarted() && mpegTsProcessor.get() != null && mpegTsProcessor.get().isStreamOpened(); diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortArbiter.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortArbiter.java deleted file mode 100644 index ff9433ddc..000000000 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortArbiter.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.sensorhub.impl.sensor.rtmp; - -import java.util.HashMap; -import java.util.Map; - -public class RtmpPortArbiter { - private final Map urls = new HashMap<>(); - - // If successful, returns null, otherwise returns the moduleUid of the existing connection - public synchronized String addConnection(int url, String moduleUid) { - if (urls.containsKey(url)) { - return urls.get(url); - } else { - urls.put(url, moduleUid); - return null; - } - } - - public synchronized void removeConnection(int url) { - urls.remove(url); - } -} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java new file mode 100644 index 000000000..481616f6d --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java @@ -0,0 +1,65 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tracks RTMP listener ports currently reserved by RTMP driver modules. + *

+ * The singleton prevents multiple RTMP driver instances from attempting to listen + * on the same port at the same time. All public methods are synchronized to + * provide simple thread-safe access to the port reservation map. + *

+ */ +public final class RtmpPortSingleton { + private static final RtmpPortSingleton instance = new RtmpPortSingleton(); + + private final Map urls = new HashMap<>(); + + public static RtmpPortSingleton getInstance() { + return instance; + } + + /** + * Attempts to reserve a port for the specified module. + *

+ * If the port is not already reserved, this method records the module unique + * identifier and returns {@code null}. If the port is already reserved, this + * method returns the unique identifier of the module that currently owns it. + *

+ * + * @param url RTMP listener port to reserve + * @param moduleUid unique identifier of the module requesting the port + * @return {@code null} if the reservation succeeded; otherwise the unique + * identifier of the module currently using the port + */ + public synchronized String addConnection(int url, String moduleUid) { + if (instance.urls.containsKey(url)) { + return urls.get(url); + } else { + instance.urls.put(url, moduleUid); + return null; + } + } + + /** + * Releases a previously reserved RTMP listener port. + * + * @param url RTMP listener port to release + */ + public synchronized void removeConnection(int url) { + instance.urls.remove(url); + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java index 5bead62ef..5f8900d5f 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java @@ -1,3 +1,15 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + package org.sensorhub.impl.sensor.rtmp.config; import org.sensorhub.api.config.DisplayInfo; diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java index 6a8f26c6f..cd72f7f5b 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java @@ -1,3 +1,15 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + package org.sensorhub.impl.sensor.rtmp.config; public enum HostType { diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java index ffc45e138..d466a4ba0 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java @@ -1,3 +1,15 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + package org.sensorhub.impl.sensor.rtmp.config; import org.sensorhub.api.config.DisplayInfo; From a219904f7b6bdc7d146670a6d73ef8f46cfc1f66 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Wed, 6 May 2026 13:10:27 -0500 Subject: [PATCH 10/13] Corrected organization, url --- sensors/video/sensorhub-driver-rtmp/build.gradle | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sensors/video/sensorhub-driver-rtmp/build.gradle b/sensors/video/sensorhub-driver-rtmp/build.gradle index 133071bd2..7a50331ae 100644 --- a/sensors/video/sensorhub-driver-rtmp/build.gradle +++ b/sensors/video/sensorhub-driver-rtmp/build.gradle @@ -10,7 +10,7 @@ dependencies { // add info to OSGi manifest osgi { manifest { - attributes('Bundle-Vendor': 'Botts Innovative Research, Inc.') + attributes('Bundle-Vendor': 'GeoRobotix Innovative Research, LLC.') attributes('Bundle-Activator': 'org.sensorhub.impl.sensor.rtmp.Activator') } } @@ -21,8 +21,8 @@ ext.pom >>= { developer { id 'kyle-fitzp' name 'Kyle Fitzpatrick' - organization 'GeoRobotix Inc.' - organizationUrl 'http://www.georobotix.com' + organization 'GeoRobotix Innovative Research, LLC.' + organizationUrl 'https://georobotix.us' } } } From d2909177d5e8f223373d5fc8e9af704f007e7770 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Wed, 6 May 2026 13:17:20 -0500 Subject: [PATCH 11/13] Simplified isConnected method --- .../org/sensorhub/impl/sensor/rtmp/RtmpDriver.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java index 1c796d675..f28a7f409 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java @@ -63,8 +63,17 @@ public class RtmpDriver extends AbstractSensorModule { int connectionPort = -1; String connectionUrl = ""; //String path = ""; + + /** + * Indicates whether the driver has successfully connected to an RTMP stream at least once since starting. + */ volatile boolean hasConnected = false; + /** + * Indicates whether the driver is currently connected to an RTMP stream. + */ + volatile boolean isConnected = false; + /** * Initializes the driver configuration and generated identifiers. *

@@ -311,6 +320,7 @@ private void startStream() { clearStatus(); reportStatus("RTMP stream for " + connectionUrl + " opened."); hasConnected = true; + isConnected = true; mpegts.processStream(); /* executorService.submit(() -> { @@ -346,6 +356,7 @@ private void heartbeat() { if (!mpegts.isStreamOpened()) { reportStatus("RTMP stream " + connectionUrl + " lost connection. Reconnecting..."); + isConnected = false; createMpegTsProcessor(); startStream(); } @@ -426,6 +437,7 @@ private void shutdown() throws SensorHubException { * closes the stream, and clears the processor reference. */ private void stopStream() { + isConnected = false; synchronized (mpegTsProcessor) { var mpegts = mpegTsProcessor.get(); if (mpegts != null) { @@ -487,6 +499,6 @@ public void cleanup() throws SensorHubException { */ @Override public boolean isConnected() { - return isStarted() && mpegTsProcessor.get() != null && mpegTsProcessor.get().isStreamOpened(); + return isConnected; } } \ No newline at end of file From 9c25778486de9b8b42ecf95e04233417a5b42da3 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Wed, 6 May 2026 13:20:11 -0500 Subject: [PATCH 12/13] Remove unnecessary references to static instance --- .../org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java index 481616f6d..1f1bc4098 100644 --- a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java @@ -46,10 +46,10 @@ public static RtmpPortSingleton getInstance() { * identifier of the module currently using the port */ public synchronized String addConnection(int url, String moduleUid) { - if (instance.urls.containsKey(url)) { + if (urls.containsKey(url)) { return urls.get(url); } else { - instance.urls.put(url, moduleUid); + urls.put(url, moduleUid); return null; } } @@ -60,6 +60,6 @@ public synchronized String addConnection(int url, String moduleUid) { * @param url RTMP listener port to release */ public synchronized void removeConnection(int url) { - instance.urls.remove(url); + urls.remove(url); } } From c528b479e9d164e1a4981f29bfacffc06a5c3d44 Mon Sep 17 00:00:00 2001 From: kyle-fitzp Date: Wed, 6 May 2026 13:37:36 -0500 Subject: [PATCH 13/13] Fix organization --- sensors/video/sensorhub-driver-rtmp/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sensors/video/sensorhub-driver-rtmp/build.gradle b/sensors/video/sensorhub-driver-rtmp/build.gradle index 7a50331ae..e0872f41d 100644 --- a/sensors/video/sensorhub-driver-rtmp/build.gradle +++ b/sensors/video/sensorhub-driver-rtmp/build.gradle @@ -10,7 +10,7 @@ dependencies { // add info to OSGi manifest osgi { manifest { - attributes('Bundle-Vendor': 'GeoRobotix Innovative Research, LLC.') + attributes('Bundle-Vendor': 'GeoRobotix Innovative Research') attributes('Bundle-Activator': 'org.sensorhub.impl.sensor.rtmp.Activator') } } @@ -21,7 +21,7 @@ ext.pom >>= { developer { id 'kyle-fitzp' name 'Kyle Fitzpatrick' - organization 'GeoRobotix Innovative Research, LLC.' + organization 'GeoRobotix Innovative Research' organizationUrl 'https://georobotix.us' } }