From 6f5b7ae9e9b4dbea657a634874e1caaf4785d640 Mon Sep 17 00:00:00 2001 From: Alfredo Dal'Ava Junior Date: Tue, 18 May 2021 03:00:04 -0300 Subject: [PATCH 1/3] add generic TelemetryServer class --- RMS/TelemetryServer.py | 50 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 RMS/TelemetryServer.py diff --git a/RMS/TelemetryServer.py b/RMS/TelemetryServer.py new file mode 100644 index 000000000..d538daffd --- /dev/null +++ b/RMS/TelemetryServer.py @@ -0,0 +1,50 @@ +""" Generic Telemetry web service """ + +from http.server import BaseHTTPRequestHandler, HTTPServer, ThreadingHTTPServer +import json +from socketserver import BaseRequestHandler +import time +import threading +from typing import Callable, Tuple + +class TelemetryServer(ThreadingHTTPServer): + data = {} + + def __init__(self, ip, port): + super().__init__((ip, port), TelemetryHandler) + + + def set_data(self, data_obj): + self.data = data_obj + + def run(self): + server_thread = threading.Thread(target=self.serve_forever) + server_thread.daemon_threads = True + server_thread.start() + +class TelemetryHandler(BaseHTTPRequestHandler): + + def do_GET(self): + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + + self.wfile.write(bytes(json.dumps(self.server.data), "utf-8")) + self.wfile.flush() + + +if __name__ == "__main__": + server = TelemetryServer('127.0.0.1', 5000) + server.run() + + print("server started") + server.set_data({'hello': 'world', 'received': 'ok'}) + + + input("Press any key to terminate the program") + server.set_data({'hello': 'world', 'received': 'err'}) + input("Press any key to terminate the program") + + server.shutdown() + print("Server stopped.") + From 1f961c0244da70541a0a37e6a729975860c5b692 Mon Sep 17 00:00:00 2001 From: Alfredo Dal'Ava Junior Date: Tue, 18 May 2021 03:01:38 -0300 Subject: [PATCH 2/3] add telemetry data service to monitor capture progress --- RMS/BufferedCapture.py | 71 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 7 deletions(-) diff --git a/RMS/BufferedCapture.py b/RMS/BufferedCapture.py index 802e45639..792b0969d 100644 --- a/RMS/BufferedCapture.py +++ b/RMS/BufferedCapture.py @@ -18,12 +18,14 @@ import re import time +import datetime import logging from multiprocessing import Process, Event import cv2 from RMS.Misc import ping +from RMS.TelemetryServer import TelemetryServer # Get the logger from the main module log = logging.getLogger("logger") @@ -34,6 +36,18 @@ class BufferedCapture(Process): """ running = False + telemetry = None + + telemetry_data = { + 'status': 'init', + 'fps_expected' : 0, + 'fps_estimated' : 0, + 'block_frames' : 0, + 'dropped_frames' : 0, + 'frame_time_average' : 0, + 'capture_start_time' : '1900-01-01T00:00:00.000000', + 'last_frame_block' : '1900-01-01T00:00:00.000000', + } def __init__(self, array1, startTime1, array2, startTime2, config, video_file=None): """ Populate arrays with (startTime, frames) after startCapture is called. @@ -66,7 +80,25 @@ def __init__(self, array1, startTime1, array2, startTime2, config, video_file=No self.time_for_drop = 1.5*(1.0/config.fps) self.dropped_frames = 0 - + + def startTelemetry(self): + # Start telemetry web service + # TODO: put host and port on config file. Enable/disable on config file + try: + self.telemetry = TelemetryServer('localhost', 2100) + self.telemetry.set_data(self.telemetry_data) + self.telemetry.run() + except: + log.error('Error starting telemetry web service!') + self.telemetry = None + + def stopTelemetry(self): + if self.telemetry is not None: + self.telemetry.shutdown() + + def updateTelemetry(self): + if self.telemetry is not None: + self.telemetry.set_data(self.telemetry_data) def startCapture(self, cameraID=0): @@ -185,6 +217,12 @@ def initVideoDevice(self): def run(self): """ Capture frames. """ + + # number of frames in each capture block + block_frames = 256 + + # Start telemetry web service + self.startTelemetry() # Init the video device device = self.initVideoDevice() @@ -216,6 +254,11 @@ def run(self): log.info('Video device opened!') + self.telemetry_data['capture_start_time'] = datetime.datetime.utcnow().isoformat() + self.telemetry_data['block_frames'] = block_frames + self.telemetry_data['fps_expected'] = self.config.fps + self.telemetry_data['status'] = 'capturing' + self.updateTelemetry() # Throw away first 10 frame for i in range(10): @@ -277,10 +320,10 @@ def run(self): t_frame = 0 + t_frame_total = 0 t_assignment = 0 t_convert = 0 t_block = time.time() - block_frames = 256 log.info('Grabbing a new block of {} frames...'.format(block_frames)) for i in range(block_frames): @@ -289,6 +332,7 @@ def run(self): t1_frame = time.time() ret, frame = device.read() t_frame = time.time() - t1_frame + t_frame_total += t_frame # If the video device was disconnected, wait for reconnection @@ -381,6 +425,20 @@ def run(self): + estimated_fps = round(block_frames/(time.time() - t_block),1) + if self.config.report_dropped_frames: + log.info('Estimated FPS: {:.2f}'.format(estimated_fps)) + + # update telemetry web service data + self.telemetry_data['fps_estimated'] = estimated_fps + self.telemetry_data['dropped_frames'] = self.dropped_frames + self.telemetry_data['frame_time_average'] = round(t_frame_total/block_frames, 3) + self.telemetry_data['last_frame_block'] = datetime.datetime.utcnow().isoformat() + self.updateTelemetry() + + # Switch the frame block buffer flags + first = not first + if self.exit.is_set(): wait_for_reconnect = False log.info('Capture exited!') @@ -400,13 +458,12 @@ def run(self): log.info('New block of raw frames available for compression with starting time: {:s}'.format(str(startTime))) - # Switch the frame block buffer flags - first = not first - if self.config.report_dropped_frames: - log.info('Estimated FPS: {:.3f}'.format(block_frames/(time.time() - t_block))) + + log.info('Releasing video device...') device.release() log.info('Video device released!') - + + self.stopTelemetry() From 0e84b5746d5f96eb02fd0b43f929de9cd4b98dd6 Mon Sep 17 00:00:00 2001 From: Alfredo Dal'Ava Junior Date: Wed, 16 Jun 2021 01:26:45 -0300 Subject: [PATCH 3/3] add video frame to telemetry service --- RMS/BufferedCapture.py | 5 ++++ RMS/TelemetryServer.py | 52 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/RMS/BufferedCapture.py b/RMS/BufferedCapture.py index 792b0969d..4a6705596 100644 --- a/RMS/BufferedCapture.py +++ b/RMS/BufferedCapture.py @@ -100,6 +100,10 @@ def updateTelemetry(self): if self.telemetry is not None: self.telemetry.set_data(self.telemetry_data) + def updateTelemetryFrame(self, frame): + if self.telemetry is not None: + self.telemetry.set_data_frame(frame) + def startCapture(self, cameraID=0): """ Start capture using specified camera. @@ -435,6 +439,7 @@ def run(self): self.telemetry_data['frame_time_average'] = round(t_frame_total/block_frames, 3) self.telemetry_data['last_frame_block'] = datetime.datetime.utcnow().isoformat() self.updateTelemetry() + self.updateTelemetryFrame(frame) # copy the latest frame read # Switch the frame block buffer flags first = not first diff --git a/RMS/TelemetryServer.py b/RMS/TelemetryServer.py index d538daffd..a2429fb8f 100644 --- a/RMS/TelemetryServer.py +++ b/RMS/TelemetryServer.py @@ -5,10 +5,13 @@ from socketserver import BaseRequestHandler import time import threading +import re +import cv2 from typing import Callable, Tuple class TelemetryServer(ThreadingHTTPServer): data = {} + data_frame = None def __init__(self, ip, port): super().__init__((ip, port), TelemetryHandler) @@ -17,6 +20,9 @@ def __init__(self, ip, port): def set_data(self, data_obj): self.data = data_obj + def set_data_frame(self, data_frame_obj): + self.data_frame = data_frame_obj + def run(self): server_thread = threading.Thread(target=self.serve_forever) server_thread.daemon_threads = True @@ -25,12 +31,48 @@ def run(self): class TelemetryHandler(BaseHTTPRequestHandler): def do_GET(self): - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() + m = re.search('GET (/.*) HTTP.+', self.requestline) + if m: + self.handle_request(m.group(1)) + else: + self.send_response(500) + self.wfile.write('Internal Error - parsing requestline') + + def handle_request(self, req): + + if req == '/': + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + + self.wfile.write(bytes(json.dumps(self.server.data), "utf-8")) + self.wfile.flush() + elif req == '/last_frame': + if self.server.data_frame is None: + self.send_response(440) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(bytes('No data', 'utf-8')) + self.wfile.flush() + else: + self.send_response(200) + self.send_header('Content-type', 'image/jpeg') + self.end_headers() + + _, frame = cv2.imencode('.JPEG', self.server.data_frame) + + self.wfile.write(frame) + self.wfile.flush() + + else: + self.send_response(440) + self.send_header('Content-type', 'text/plain') + self.end_headers() + + self.wfile.write(bytes('Not found', 'utf-8')) + self.wfile.flush() + - self.wfile.write(bytes(json.dumps(self.server.data), "utf-8")) - self.wfile.flush() if __name__ == "__main__":