diff --git a/sensors/video/sensorhub-driver-ffmpeg/src/main/java/org/sensorhub/mpegts/MpegTsProcessor.java b/sensors/video/sensorhub-driver-ffmpeg/src/main/java/org/sensorhub/mpegts/MpegTsProcessor.java index c3da173af..2de567d7a 100644 --- a/sensors/video/sensorhub-driver-ffmpeg/src/main/java/org/sensorhub/mpegts/MpegTsProcessor.java +++ b/sensors/video/sensorhub-driver-ffmpeg/src/main/java/org/sensorhub/mpegts/MpegTsProcessor.java @@ -14,12 +14,14 @@ import org.bytedeco.ffmpeg.avcodec.AVCodecParameters; import org.bytedeco.ffmpeg.avcodec.AVPacket; import org.bytedeco.ffmpeg.avformat.AVFormatContext; +import org.bytedeco.ffmpeg.avformat.AVIOInterruptCB; import org.bytedeco.ffmpeg.avformat.AVInputFormat; import org.bytedeco.ffmpeg.avutil.AVDictionary; import org.bytedeco.ffmpeg.avutil.AVRational; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avformat; import org.bytedeco.ffmpeg.global.avutil; +import org.bytedeco.javacpp.Pointer; import org.bytedeco.javacpp.PointerPointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.bytedeco.ffmpeg.global.avdevice.avdevice_register_all; -import static org.bytedeco.ffmpeg.global.avformat.av_find_input_format; -import static org.bytedeco.ffmpeg.global.avformat.av_read_frame; +import static org.bytedeco.ffmpeg.global.avformat.*; /** * The class provides a wrapper to bytedeco.org JavaCpp-Platform. @@ -98,6 +99,13 @@ public class MpegTsProcessor extends Thread { */ private AVFormatContext avFormatContext; + /** + * Callbacks allow some blocking AVFormatContext operations to terminate early on + * stream close. These are global for GC purposes; do not directly modify. + */ + private AVIOInterruptCB.Callback_Pointer callbackPointer; + private AVIOInterruptCB interruptCallback; + /** * Flag indicating if processing of the transport stream should be terminated. */ @@ -186,7 +194,17 @@ public boolean openStream() { avformat.avformat_network_init(); // Create a new AV Format Context for I/O - avFormatContext = new AVFormatContext(null); + avFormatContext = avformat_alloc_context(); + + // Allow for context to be interrupted after attempt to stop stream + interruptCallback = avFormatContext.interrupt_callback(); + callbackPointer = new AVIOInterruptCB.Callback_Pointer() { + @Override + public int call(Pointer opaque) { + return terminateProcessing.get() ? 1 : 0; + } + }; + interruptCallback.callback(callbackPointer); AVInputFormat inputFormat = null; @@ -526,11 +544,8 @@ public void closeStream() { */ public void stopProcessingStream() { logger.debug("stopProcessingStream"); - - if (streamOpened) { - loop = false; - terminateProcessing.set(true); - } + loop = false; + terminateProcessing.set(true); } /** diff --git a/sensors/video/sensorhub-driver-rtmp/README.md b/sensors/video/sensorhub-driver-rtmp/README.md new file mode 100644 index 000000000..1e587b7e0 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/README.md @@ -0,0 +1,38 @@ +# FFmpeg RTMP Driver + +OSH sensor driver using FFmpeg to listen to RTMP streams. + +This driver depends on the following modules at runtime: +* sensorhub-driver-ffmpeg + +## Config + +### Connection + +* **Host** + - RTMP server host + - **UNSPECIFIED**: listen for remote connections. + - **LOCALHOST**: listen for local connections. + - **DOCKER_INTERNAL**: listen for connections inside a docker container. +* **Port** + - RTMP server port + - 1935 is the default port for RTMP. + - Any port may be used as long as it is not in use by another listener (or any other application). + - RTMP sensor driver modules track ports in use. To release ownership of a port, STOP the module. + +## Usage + +### Initialization + +

To initialize the driver, provide a unique serial number in the configuration. +In connection, set the host and provide an unused port. Once initialized, other RTMP sensor driver modules cannot +use the same port. +

Stop the module to free the port.

+ +### Listening + +

Once initialized, start the module to begin listening for RTMP connections. After the driver +is started, begin publishing the RTMP stream. +

Note: The module does NOT attempt to validate the +username, password, RTMP app, or RTMP playpath. Only the host and port need to match.

+ diff --git a/sensors/video/sensorhub-driver-rtmp/build.gradle b/sensors/video/sensorhub-driver-rtmp/build.gradle new file mode 100644 index 000000000..e0872f41d --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/build.gradle @@ -0,0 +1,28 @@ +description = 'RTMP Video Driver' +ext.details = 'Driver to listen for and demux RTMP video streams' +version = '1.0.0' + +dependencies { + implementation 'org.sensorhub:sensorhub-core:' + oshCoreVersion + implementation project(':sensorhub-driver-ffmpeg') +} + +// add info to OSGi manifest +osgi { + manifest { + attributes('Bundle-Vendor': 'GeoRobotix Innovative Research') + attributes('Bundle-Activator': 'org.sensorhub.impl.sensor.rtmp.Activator') + } +} + +// add info to maven pom +ext.pom >>= { + developers { + developer { + id 'kyle-fitzp' + name 'Kyle Fitzpatrick' + organization 'GeoRobotix Innovative Research' + organizationUrl 'https://georobotix.us' + } + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java new file mode 100644 index 000000000..8f3900c58 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/Activator.java @@ -0,0 +1,25 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp; + +import org.osgi.framework.BundleActivator; +import org.sensorhub.utils.OshBundleActivator; + + +/* + * Needed to expose java services as OSGi services + */ +public class Activator extends OshBundleActivator implements BundleActivator +{ + +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java new file mode 100644 index 000000000..85396aed1 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDescriptor.java @@ -0,0 +1,38 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp; + +import org.sensorhub.api.module.IModule; +import org.sensorhub.api.module.IModuleProvider; +import org.sensorhub.api.module.ModuleConfig; +import org.sensorhub.impl.module.JarModuleProvider; +import org.sensorhub.impl.sensor.rtmp.config.RtmpConfig; + + +public class RtmpDescriptor extends JarModuleProvider implements IModuleProvider +{ + + @Override + public Class> getModuleClass() + { + return RtmpDriver.class; + } + + + @Override + public Class getModuleConfigClass() + { + return RtmpConfig.class; + } + +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java new file mode 100644 index 000000000..f28a7f409 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpDriver.java @@ -0,0 +1,504 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp; + +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.module.ModuleEvent; +import org.sensorhub.api.sensor.SensorException; +import org.sensorhub.impl.sensor.AbstractSensorModule; +import org.sensorhub.impl.sensor.ffmpeg.outputs.AudioOutput; +import org.sensorhub.impl.sensor.ffmpeg.outputs.VideoOutput; +import org.sensorhub.impl.sensor.rtmp.config.RtmpConfig; +import org.sensorhub.mpegts.MpegTsProcessor; +import org.sensorhub.utils.Async; + +import java.security.SecureRandom; +import java.util.HexFormat; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + +import static org.bytedeco.ffmpeg.global.avutil.av_log_set_callback; + +/** + * OpenSensorHub sensor module that listens for and processes RTMP streams. + *

+ * The driver creates an FFmpeg-backed MPEG-TS processor configured as an RTMP + * listener. Once a publisher connects, the driver detects available audio and + * video streams, creates matching OpenSensorHub outputs, and forwards stream + * data through those outputs. + *

+ *

+ * Only one RTMP driver instance may use a given port at a time. Port ownership + * is tracked through a shared {@link RtmpPortSingleton}. + *

+ */ +public class RtmpDriver extends AbstractSensorModule { + private static final String COMMAND_LINE_ARGS = "-timeout 0 -listen 1 -username test -password test"; + private static final int EXECUTOR_JOIN_TIMEOUT = 10; + private static final TimeUnit EXECUTOR_JOIN_TIME_UNIT = TimeUnit.SECONDS; + private static final int HEARTBEAT_INTERVAL = 5; + private static final TimeUnit HEARTBEAT_TIME_UNIT = TimeUnit.SECONDS; + private static final int MAX_STARTUP_WAIT_TIME_MS = 5000; + + private final RtmpPortSingleton portSingleton = RtmpPortSingleton.getInstance(); + private ExecutorService executorService; + private ExecutorService videoExecutorService; + private ExecutorService audioExecutorService; + private ScheduledExecutorService heartbeatExecutorService; + + final AtomicReference mpegTsProcessor = new AtomicReference<>(); + final AtomicReference> videoOutput = new AtomicReference<>(); + final AtomicReference> audioOutput = new AtomicReference<>(); + + int connectionPort = -1; + String connectionUrl = ""; + //String path = ""; + + /** + * Indicates whether the driver has successfully connected to an RTMP stream at least once since starting. + */ + volatile boolean hasConnected = false; + + /** + * Indicates whether the driver is currently connected to an RTMP stream. + */ + volatile boolean isConnected = false; + + /** + * Initializes the driver configuration and generated identifiers. + *

+ * If no unique identifier has been assigned, this method generates both the + * OpenSensorHub unique identifier and XML identifier from the configured + * serial number. It also releases any previously tracked port and rebuilds + * the RTMP listener URL from the current configuration. + *

+ * + * @throws SensorHubException if initialization fails + */ + @Override + protected void doInit() throws SensorHubException { + super.doInit(); + + if (getUniqueIdentifier() == null) { + generateUniqueID("urn:osh:sensor:rtmp:", config.serialNumber); + generateXmlID("RTMP_", config.serialNumber); + } + + portSingleton.removeConnection(connectionPort); + + setConnectionUrl(); + + //createMpegTsProcessor(); + } + + private static String generateStreamKey() { + byte[] bytes = new byte[16]; + new SecureRandom().nextBytes(bytes); + return HexFormat.of().formatHex(bytes); + } + + /** + * Creates and stores the FFmpeg-backed MPEG-TS processor for the configured + * RTMP listener URL. + */ + private void createMpegTsProcessor() { + var mpegts = new MpegTsProcessor(connectionUrl, COMMAND_LINE_ARGS/* + " -rtmp_app /live -rtmp_playpath " + path*/); + mpegts.setInjectVideoExtradata(true); + mpegTsProcessor.set(mpegts); + } + + /** + * Builds the RTMP listener URL from the configured host and port. + * + * @throws SensorException if the connection configuration is invalid + */ + private void setConnectionUrl() throws SensorException { + + var connectionConfig = config.connectionConfig; + StringBuilder sb = new StringBuilder("rtmp://"); + + + /* + if (connectionConfig.host == HostType.OVERRIDE) { + if (connectionConfig.hostOverride == null || connectionConfig.hostOverride.isBlank()) { + throw new SensorException("Domain override is not set"); + } + sb.append(connectionConfig.hostOverride); + } else { + sb.append(connectionConfig.host.host); + } + + */ + + sb.append(connectionConfig.host.host); + + sb.append(":").append(connectionConfig.port); + + /* + if (connectionConfig.generateRandomStreamKey) { + connectionConfig.generateRandomStreamKey = false; + String streamKey = generateStreamKey(); + if (!connectionConfig.path.isBlank() && !connectionConfig.path.endsWith("/")) { + connectionConfig.path += "/"; + } + connectionConfig.path += streamKey; + } + + if (connectionConfig.path != null && !connectionConfig.path.isBlank()) { + if (!connectionConfig.path.startsWith("/")) { + connectionConfig.path = "/" + connectionConfig.path; + } + sb.append(connectionConfig.path); + } + + path = connectionConfig.path; + + */ + connectionUrl = sb.toString(); + connectionPort = connectionConfig.port; + } + + /** + * Starts the driver by reserving the configured RTMP port. + * + * @throws SensorHubException if the configured port is already in use by + * another RTMP driver module + */ + @Override + protected void doStart() throws SensorHubException { + String moduleUid; + if ((moduleUid = portSingleton.addConnection(connectionPort, this.getUniqueIdentifier())) != null) { + throw new SensorException("Port "+ connectionPort + " already in use by module: " + moduleUid); + } + } + + /** + * Performs post-start setup for RTMP listening and heartbeat monitoring. + *

+ * This method creates a fresh MPEG-TS processor, stops any previous executor + * services, reports the listening URL, starts the stream listener thread, and + * schedules periodic heartbeat checks. + *

+ * + * @throws SensorHubException if executor shutdown is interrupted + */ + @Override + protected void afterStart() throws SensorHubException { + super.afterStart(); + //stopStream(); + hasConnected = false; + + synchronized (mpegTsProcessor) { + var mpegts = mpegTsProcessor.get(); + if (mpegts != null && mpegts.getState() != Thread.State.NEW) { + stopStream(); + } + createMpegTsProcessor(); + } + + try { + stopExecutors(); + } catch (InterruptedException e) { + throw new SensorHubException("Interrupted while stopping executors", e); + } + + reportStatus("Listening on: " + connectionUrl); + executorService = Executors.newSingleThreadExecutor(); + executorService.submit(this::startStream); + heartbeatExecutorService = Executors.newSingleThreadScheduledExecutor(); + heartbeatExecutorService.scheduleAtFixedRate(this::heartbeat, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, HEARTBEAT_TIME_UNIT); + heartbeatExecutorService.submit(this::heartbeat); + } + + /** + * Determines whether the module is currently stopping or stopped. + * + * @return {@code true} if the module is stopping or stopped; otherwise {@code false} + */ + private boolean isStopping () { + return getCurrentState() == ModuleEvent.ModuleState.STOPPING || getCurrentState() == ModuleEvent.ModuleState.STOPPED; + } + + /* + private boolean isMatchingPath(String url) { + boolean isMatching = url != null && url.trim().contains(path); + if (!isMatching) { + logger.warn("Received stream on: {} but expected: {}", url, config.connectionConfig.path); + } + return isMatching; + } + + */ + + /** + * Opens the RTMP listener, waits for an incoming stream, creates audio/video + * outputs for detected streams, and begins processing stream data. + *

+ * The method waits until the module reaches the {@code STARTED} state before + * adding outputs. If the stream cannot be opened, an error is reported and + * processing is not started. + *

+ */ + private void startStream() { + // Need to wait for STARTED state so that outputs can be added + try { + Async.waitForCondition(() -> getCurrentState() == ModuleEvent.ModuleState.STARTED, MAX_STARTUP_WAIT_TIME_MS); + } catch (TimeoutException e) { + reportError("Failed to start stream; timed out waiting for startup", e); + return; + } + + boolean status; + + var mpegts = mpegTsProcessor.get(); + + if (mpegts == null) { + logger.error("Could not start; stream processor is null"); + return; + } + + /* + // Reject connections that don't match the configured path + // Thread will sit here until a matching connection is made + do { + if (Thread.currentThread().isInterrupted() || isStopping()) { + return; + } + mpegts.closeStream(); + status = mpegts.openStream(); + String path = mpegts.getPrivDataString("rtmp_app") + "/" + mpegts.getPrivDataString("rtmp_playpath"); + } while (!isMatchingPath(path)); + + */ + + mpegts.closeStream(); + status = mpegts.openStream(); + + if (isStopping()) { + return; + } + + if (!status) { + String error = "Failed to connect to " + connectionUrl; + reportError(error, new SensorException(error)); + return; + } + + synchronized (mpegTsProcessor) { + mpegts = mpegTsProcessor.get(); + + if (mpegts == null) { + reportError("Stream could not be opened", new SensorException("MpegTs processor is null")); + return; + } + + if (mpegts.isStreamOpened()) { + if (mpegts.hasVideoStream()) { + createVideoOutput(mpegts.getVideoStreamFrameDimensions(), mpegts.getVideoCodecName()); + mpegts.setVideoDataBufferListener(videoOutput.get()); + } + + if (mpegts.hasAudioStream()) { + createAudioOutput(mpegts.getAudioSampleRate(), mpegts.getAudioCodecName()); + mpegts.setAudioDataBufferListener(audioOutput.get()); + } + + } else { + reportError("Stream could not be opened", new SensorException("RTMP stream connected but not opened")); + return; + } + clearStatus(); + reportStatus("RTMP stream for " + connectionUrl + " opened."); + hasConnected = true; + isConnected = true; + mpegts.processStream(); + /* + executorService.submit(() -> { + MpegTsProcessor processor; + while ((processor = mpegTsProcessor.get()) != null) { + + while (processor.isStreamOpened()) { + processor.processP(); + } + if (!Thread.currentThread().isInterrupted()) { + reportStatus("RTMP stream " + connectionUrl + " lost connection. Reconnecting..."); + processor.openStream(); + } else { + return; + } + } + reportStatus("RTMP stream closed."); + + }); + + */ + //mpegts.processStream(); + } + } + + /** + * Checks the active stream and attempts to reconnect if a previously + * connected stream has been lost. + */ + private void heartbeat() { + var mpegts = mpegTsProcessor.get(); + if (mpegts == null || !hasConnected) { return; } + + if (!mpegts.isStreamOpened()) { + reportStatus("RTMP stream " + connectionUrl + " lost connection. Reconnecting..."); + isConnected = false; + createMpegTsProcessor(); + startStream(); + } + } + + /** + * Creates and registers the video output for the detected RTMP video stream. + * + * @param videoDims video frame dimensions, usually width and height + * @param codecName name of the detected video codec + */ + protected void createVideoOutput(int[] videoDims, String codecName) { + synchronized (videoOutput) { + var videoOut = new VideoOutput<>(this, videoDims, codecName); + videoOutput.set(videoOut); + + if (videoExecutorService != null) { + videoExecutorService.shutdown(); + } + + videoExecutorService = Executors.newSingleThreadExecutor(); + videoOut.setExecutor(videoExecutorService); + videoOut.doInit(); + addOutput(videoOut, false); + } + } + + /** + * Creates and registers the audio output for the detected RTMP audio stream. + * + * @param sampleRate detected audio sample rate in hertz + * @param codecName name of the detected audio codec + */ + protected void createAudioOutput(int sampleRate, String codecName) { + synchronized (audioOutput) { + var audioOut = new AudioOutput<>(this, sampleRate, codecName); + audioOutput.set(audioOut); + + if (audioExecutorService != null) { + audioExecutorService.shutdown(); + } + audioExecutorService = Executors.newSingleThreadExecutor(); + audioOut.setExecutor(audioExecutorService); + audioOut.doInit(); + addOutput(audioOut, false); + } + } + + /** + * Stops the driver and releases all RTMP stream resources. + * + * @throws SensorHubException if shutdown fails + */ + @Override + protected void doStop() throws SensorHubException { + super.doStop(); + shutdown(); + } + + /** + * Releases the reserved port, stops the stream, and terminates executor services. + * + * @throws SensorHubException if executor shutdown is interrupted + */ + private void shutdown() throws SensorHubException { + portSingleton.removeConnection(config.connectionConfig.port); + stopStream(); + try { + stopExecutors(); + } catch (InterruptedException e) { + throw new SensorHubException("Interrupted while stopping executors", e); + } + + } + + /** + * Stops stream processing, waits for the MPEG-TS processor thread to finish, + * closes the stream, and clears the processor reference. + */ + private void stopStream() { + isConnected = false; + synchronized (mpegTsProcessor) { + var mpegts = mpegTsProcessor.get(); + if (mpegts != null) { + mpegts.stopProcessingStream(); + if (mpegts.isAlive()) { + try { + logger.info("Waiting for stream to stop."); + mpegts.join(); + } catch (InterruptedException e) { + logger.error("Interrupted while waiting for stream to stop.", e); + } + } + mpegts.closeStream(); + } + mpegTsProcessor.set(null); + } + } + + /** + * Stops all executor services used by the driver and waits for termination. + * + * @throws InterruptedException if interrupted while waiting for executor termination + */ + private void stopExecutors() throws InterruptedException { + if (executorService != null) { + executorService.shutdownNow(); + executorService.awaitTermination(EXECUTOR_JOIN_TIMEOUT, EXECUTOR_JOIN_TIME_UNIT); + } + if (videoExecutorService != null) { + videoExecutorService.shutdownNow(); + videoExecutorService.awaitTermination(EXECUTOR_JOIN_TIMEOUT, EXECUTOR_JOIN_TIME_UNIT); + } + if (audioExecutorService != null) { + audioExecutorService.shutdownNow(); + audioExecutorService.awaitTermination(EXECUTOR_JOIN_TIMEOUT, EXECUTOR_JOIN_TIME_UNIT); + } + if (heartbeatExecutorService != null) { + heartbeatExecutorService.shutdownNow(); + heartbeatExecutorService.awaitTermination(EXECUTOR_JOIN_TIMEOUT, EXECUTOR_JOIN_TIME_UNIT); + } + } + + /** + * Cleans up module resources before disposal. + * + * @throws SensorHubException if cleanup or shutdown fails + */ + @Override + public void cleanup() throws SensorHubException { + super.cleanup(); + shutdown(); + } + + /** + * Indicates whether the driver is currently started and has an open RTMP stream. + * + * @return {@code true} if the module is started and the RTMP stream is open; + * otherwise {@code false} + */ + @Override + public boolean isConnected() { + return isConnected; + } +} \ No newline at end of file diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java new file mode 100644 index 000000000..1f1bc4098 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/RtmpPortSingleton.java @@ -0,0 +1,65 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tracks RTMP listener ports currently reserved by RTMP driver modules. + *

+ * The singleton prevents multiple RTMP driver instances from attempting to listen + * on the same port at the same time. All public methods are synchronized to + * provide simple thread-safe access to the port reservation map. + *

+ */ +public final class RtmpPortSingleton { + private static final RtmpPortSingleton instance = new RtmpPortSingleton(); + + private final Map urls = new HashMap<>(); + + public static RtmpPortSingleton getInstance() { + return instance; + } + + /** + * Attempts to reserve a port for the specified module. + *

+ * If the port is not already reserved, this method records the module unique + * identifier and returns {@code null}. If the port is already reserved, this + * method returns the unique identifier of the module that currently owns it. + *

+ * + * @param url RTMP listener port to reserve + * @param moduleUid unique identifier of the module requesting the port + * @return {@code null} if the reservation succeeded; otherwise the unique + * identifier of the module currently using the port + */ + public synchronized String addConnection(int url, String moduleUid) { + if (urls.containsKey(url)) { + return urls.get(url); + } else { + urls.put(url, moduleUid); + return null; + } + } + + /** + * Releases a previously reserved RTMP listener port. + * + * @param url RTMP listener port to release + */ + public synchronized void removeConnection(int url) { + urls.remove(url); + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java new file mode 100644 index 000000000..5f8900d5f --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/ConnectionConfig.java @@ -0,0 +1,43 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp.config; + +import org.sensorhub.api.config.DisplayInfo; + +public class ConnectionConfig { + + /* + @DisplayInfo.Required + @DisplayInfo(label = "Generate Random Stream Key", desc = "Enable to generate and append a random hex string to the path. " + + "Recommended for security. Only enable on first init, otherwise path will include multiple keys. ") + public boolean generateRandomStreamKey = true; + + */ + + @DisplayInfo.Required + @DisplayInfo(label = "Host", desc = "Domain listening for an RTMP connection request. Unspecified should work " + + "for most cases.") + public HostType host = HostType.UNSPECIFIED; + + @DisplayInfo.Required + @DisplayInfo(label = "Port", desc = "Port listening for an RTMP connection request.") + @DisplayInfo.ValueRange(min = 1, max = 65535) + public int port = 1935; + + /* + @DisplayInfo(label = "Path", desc = "(Optional) Path to listen for an RTMP connection request. I.e. everything in the URL " + + "after the port.") + public String path = ""; + + */ +} \ No newline at end of file diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java new file mode 100644 index 000000000..cd72f7f5b --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/HostType.java @@ -0,0 +1,26 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp.config; + +public enum HostType { + UNSPECIFIED("0.0.0.0"), + LOCALHOST("localhost"), + DOCKER_INTERNAL("host.docker.internal")/*, + OVERRIDE("")*/; + + public final String host; + + HostType(String host) { + this.host = host; + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java new file mode 100644 index 000000000..d466a4ba0 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/java/org/sensorhub/impl/sensor/rtmp/config/RtmpConfig.java @@ -0,0 +1,43 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2026 GeoRobotix Innovative Research, Inc. All Rights Reserved. + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.sensor.rtmp.config; + +import org.sensorhub.api.config.DisplayInfo; +import org.sensorhub.api.sensor.PositionConfig; +import org.sensorhub.api.sensor.SensorConfig; + +public class RtmpConfig extends SensorConfig { + @DisplayInfo.Required + @DisplayInfo(label = "Video Stream ID", desc = "Serial number or unique identifier for video stream.") + public String serialNumber = "video001"; + + @DisplayInfo.Required + @DisplayInfo(label = "Connection", desc = "Configuration options for source of RTMP.") + public ConnectionConfig connectionConfig = new ConnectionConfig(); + + /** + * Configuration options for the location and orientation of the sensor. + */ + @DisplayInfo(label = "Position", desc = "Location and orientation of the sensor.") + public PositionConfig positionConfig = new PositionConfig(); + + @Override + public PositionConfig.LLALocation getLocation() { + return positionConfig.location; + } + + @Override + public PositionConfig.EulerOrientation getOrientation() { + return positionConfig.orientation; + } +} diff --git a/sensors/video/sensorhub-driver-rtmp/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider b/sensors/video/sensorhub-driver-rtmp/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider new file mode 100644 index 000000000..e9ee1b390 --- /dev/null +++ b/sensors/video/sensorhub-driver-rtmp/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider @@ -0,0 +1 @@ +org.sensorhub.impl.sensor.rtmp.RtmpDescriptor \ No newline at end of file