1616"""
1717
1818import math
19+ import typing as ty
1920from dataclasses import dataclass
20- from typing import List , NamedTuple , Optional
21+ from typing import NamedTuple
2122
2223import cv2
2324import numpy
2425
26+ from scenedetect .detector import Detector , Event , EventType
27+ from scenedetect .frame_timecode import FrameTimecode
2528from scenedetect .scene_detector import FlashFilter , SceneDetector
29+ from scenedetect .stats_manager import StatsManager
2630
2731
2832def _mean_pixel_distance (left : numpy .ndarray , right : numpy .ndarray ) -> float :
@@ -97,7 +101,7 @@ class _FrameData:
97101 """Frame saturation map [2D 8-bit]."""
98102 lum : numpy .ndarray
99103 """Frame luma/brightness map [2D 8-bit]."""
100- edges : Optional [numpy .ndarray ]
104+ edges : ty . Optional [numpy .ndarray ]
101105 """Frame edge map [2D 8-bit, edges are 255, non edges 0]. Affected by `kernel_size`."""
102106
103107 def __init__ (
@@ -106,7 +110,7 @@ def __init__(
106110 min_scene_len : int = 15 ,
107111 weights : "ContentDetector.Components" = DEFAULT_COMPONENT_WEIGHTS ,
108112 luma_only : bool = False ,
109- kernel_size : Optional [int ] = None ,
113+ kernel_size : ty . Optional [int ] = None ,
110114 filter_mode : FlashFilter .Mode = FlashFilter .Mode .MERGE ,
111115 ):
112116 """
@@ -126,17 +130,17 @@ def __init__(
126130 super ().__init__ ()
127131 self ._threshold : float = threshold
128132 self ._min_scene_len : int = min_scene_len
129- self ._last_above_threshold : Optional [int ] = None
130- self ._last_frame : Optional [ContentDetector ._FrameData ] = None
133+ self ._last_above_threshold : ty . Optional [int ] = None
134+ self ._last_frame : ty . Optional [ContentDetector ._FrameData ] = None
131135 self ._weights : ContentDetector .Components = weights
132136 if luma_only :
133137 self ._weights = ContentDetector .LUMA_ONLY_WEIGHTS
134- self ._kernel : Optional [numpy .ndarray ] = None
138+ self ._kernel : ty . Optional [numpy .ndarray ] = None
135139 if kernel_size is not None :
136140 if kernel_size < 3 or kernel_size % 2 == 0 :
137141 raise ValueError ("kernel_size must be odd integer >= 3" )
138142 self ._kernel = numpy .ones ((kernel_size , kernel_size ), numpy .uint8 )
139- self ._frame_score : Optional [float ] = None
143+ self ._frame_score : ty . Optional [float ] = None
140144 self ._flash_filter = FlashFilter (mode = filter_mode , length = min_scene_len )
141145
142146 def get_metrics (self ):
@@ -187,7 +191,7 @@ def _calculate_frame_score(self, frame_num: int, frame_img: numpy.ndarray) -> fl
187191 self ._last_frame = ContentDetector ._FrameData (hue , sat , lum , edges )
188192 return frame_score
189193
190- def process_frame (self , frame_num : int , frame_img : numpy .ndarray ) -> List [int ]:
194+ def process_frame (self , frame_num : int , frame_img : numpy .ndarray ) -> ty . List [int ]:
191195 """Process the next frame. `frame_num` is assumed to be sequential.
192196
193197 Args:
@@ -196,7 +200,7 @@ def process_frame(self, frame_num: int, frame_img: numpy.ndarray) -> List[int]:
196200 frame_img (numpy.ndarray or None): Video frame corresponding to `frame_img`.
197201
198202 Returns:
199- List[int]: List of frames where scene cuts have been detected. There may be 0
203+ ty. List[int]: List of frames where scene cuts have been detected. There may be 0
200204 or more frames in the list, and not necessarily the same as frame_num.
201205 """
202206 self ._frame_score = self ._calculate_frame_score (frame_num , frame_img )
@@ -237,3 +241,205 @@ def _detect_edges(self, lum: numpy.ndarray) -> numpy.ndarray:
237241 @property
238242 def event_buffer_length (self ) -> int :
239243 return self ._flash_filter .max_behind
244+
245+
246+ # TODO: Make ContentDetector implement both interfaces once ContentDetector2 is at feature parity.
247+ # Currently it is missing enforcement of min_scene_length, as FlashFilter needs to be transitioned
248+ # to work off of time instead of frames.
249+ class ContentDetector2 (Detector ):
250+ """Detects fast cuts using changes in colour and intensity between frames.
251+
252+ The difference is calculated in the HSV color space, and compared against a set threshold to
253+ determine when a fast cut has occurred.
254+ """
255+
256+ # TODO: Come up with some good weights for a new default if there is one that can pass
257+ # a wider variety of test cases.
258+ class Components (NamedTuple ):
259+ """Components that make up a frame's score, and their default values."""
260+
261+ delta_hue : float = 1.0
262+ """Difference between pixel hue values of adjacent frames."""
263+ delta_sat : float = 1.0
264+ """Difference between pixel saturation values of adjacent frames."""
265+ delta_lum : float = 1.0
266+ """Difference between pixel luma (brightness) values of adjacent frames."""
267+ delta_edges : float = 0.0
268+ """Difference between calculated edges of adjacent frames.
269+
270+ Edge differences are typically larger than the other components, so the detection
271+ threshold may need to be adjusted accordingly."""
272+
273+ DEFAULT_COMPONENT_WEIGHTS = Components ()
274+ """Default component weights. Actual default values are specified in :class:`Components`
275+ to allow adding new components without breaking existing usage."""
276+
277+ LUMA_ONLY_WEIGHTS = Components (
278+ delta_hue = 0.0 ,
279+ delta_sat = 0.0 ,
280+ delta_lum = 1.0 ,
281+ delta_edges = 0.0 ,
282+ )
283+ """Component weights to use if `luma_only` is set."""
284+
285+ FRAME_SCORE_KEY = "content_val"
286+ """Key in statsfile representing the final frame score after weighed by specified components."""
287+
288+ METRIC_KEYS = [FRAME_SCORE_KEY , * Components ._fields ]
289+ """All statsfile keys this detector produces."""
290+
291+ @dataclass
292+ class _FrameData :
293+ """Data calculated for a given frame."""
294+
295+ hue : numpy .ndarray
296+ """Frame hue map [2D 8-bit]."""
297+ sat : numpy .ndarray
298+ """Frame saturation map [2D 8-bit]."""
299+ lum : numpy .ndarray
300+ """Frame luma/brightness map [2D 8-bit]."""
301+ edges : ty .Optional [numpy .ndarray ]
302+ """Frame edge map [2D 8-bit, edges are 255, non edges 0]. Affected by `kernel_size`."""
303+
304+ def __init__ (
305+ self ,
306+ threshold : float = 27.0 ,
307+ min_scene_len : FrameTimecode = 15 ,
308+ weights : "ContentDetector2.Components" = DEFAULT_COMPONENT_WEIGHTS ,
309+ luma_only : bool = False ,
310+ kernel_size : ty .Optional [int ] = None ,
311+ filter_mode : FlashFilter .Mode = FlashFilter .Mode .MERGE ,
312+ ):
313+ """
314+ Arguments:
315+ threshold: Threshold the average change in pixel intensity must exceed to trigger a cut.
316+ min_scene_len: Once a cut is detected, this many frames must pass before a new one can
317+ be added to the scene list. Can be an int or FrameTimecode type.
318+ weights: Weight to place on each component when calculating frame score
319+ (`content_val` in a statsfile, the value `threshold` is compared against).
320+ luma_only: If True, only considers changes in the luminance channel of the video.
321+ Equivalent to specifying `weights` as :data:`ContentDetector2.LUMA_ONLY`.
322+ Overrides `weights` if both are set.
323+ kernel_size: Size of kernel for expanding detected edges. Must be odd integer
324+ greater than or equal to 3. If None, automatically set using video resolution.
325+ filter_mode: Mode to use when filtering cuts to meet `min_scene_len`.
326+ """
327+ super ().__init__ ()
328+ self ._threshold : float = threshold
329+ self ._min_scene_len : FrameTimecode = min_scene_len
330+ self ._last_above_threshold : ty .Optional [FrameTimecode ] = None
331+ self ._last_frame : ty .Optional [ContentDetector2 ._FrameData ] = None
332+ self ._weights : ContentDetector2 .Components = weights
333+ if luma_only :
334+ self ._weights = ContentDetector2 .LUMA_ONLY_WEIGHTS
335+ self ._kernel : ty .Optional [numpy .ndarray ] = None
336+ if kernel_size is not None :
337+ if kernel_size < 3 or kernel_size % 2 == 0 :
338+ raise ValueError ("kernel_size must be odd integer >= 3" )
339+ self ._kernel = numpy .ones ((kernel_size , kernel_size ), numpy .uint8 )
340+ self ._frame_score : ty .Optional [float ] = None
341+ self ._flash_filter = FlashFilter (mode = filter_mode , length = min_scene_len )
342+ self ._stats : ty .Optional [StatsManager ] = None
343+
344+ def get_metrics (self ):
345+ return ContentDetector2 .METRIC_KEYS
346+
347+ def set_stats_manager (self , stats : StatsManager ):
348+ self ._stats = stats
349+
350+ def _calculate_frame_score (self , frame : numpy .ndarray , timecode : FrameTimecode ) -> float :
351+ """Calculate score representing relative amount of motion in `frame_img` compared to
352+ the last time the function was called (returns 0.0 on the first call)."""
353+ # TODO: Add option to enable motion estimation before calculating score components.
354+ # TODO: Investigate methods of performing cheaper alternatives, e.g. shifting or resizing
355+ # the frame to simulate camera movement, using optical flow, etc...
356+
357+ # Convert image into HSV colorspace.
358+ hue , sat , lum = cv2 .split (cv2 .cvtColor (frame , cv2 .COLOR_BGR2HSV ))
359+
360+ # Performance: Only calculate edges if we have to.
361+ calculate_edges : bool = (self ._weights .delta_edges > 0.0 ) or self ._stats is not None
362+ edges = self ._detect_edges (lum ) if calculate_edges else None
363+
364+ if self ._last_frame is None :
365+ # Need another frame to compare with for score calculation.
366+ self ._last_frame = ContentDetector2 ._FrameData (hue , sat , lum , edges )
367+ return 0.0
368+
369+ score_components = ContentDetector2 .Components (
370+ delta_hue = _mean_pixel_distance (hue , self ._last_frame .hue ),
371+ delta_sat = _mean_pixel_distance (sat , self ._last_frame .sat ),
372+ delta_lum = _mean_pixel_distance (lum , self ._last_frame .lum ),
373+ delta_edges = (
374+ 0.0 if edges is None else _mean_pixel_distance (edges , self ._last_frame .edges )
375+ ),
376+ )
377+
378+ frame_score : float = sum (
379+ component * weight for (component , weight ) in zip (score_components , self ._weights )
380+ ) / sum (abs (weight ) for weight in self ._weights )
381+
382+ # Record components and frame score if needed for analysis.
383+ if self ._stats is not None :
384+ metrics = {self .FRAME_SCORE_KEY : frame_score }
385+ metrics .update (score_components ._asdict ())
386+ self ._stats .set_metrics (timecode .frame_num , metrics )
387+
388+ # Store all data required to calculate the next frame's score.
389+ self ._last_frame = ContentDetector2 ._FrameData (hue , sat , lum , edges )
390+ return frame_score
391+
392+ def process (self , frame : numpy .ndarray , timecode : FrameTimecode ) -> ty .List [Event ]:
393+ """Process the next frame. `frame_num` is assumed to be sequential.
394+
395+ Args:
396+ frame_num (int): Frame number of frame that is being passed. Can start from any value
397+ but must remain sequential.
398+ frame_img (numpy.ndarray or None): Video frame corresponding to `frame_img`.
399+
400+ Returns:
401+ ty.List[int]: List of frames where scene cuts have been detected. There may be 0
402+ or more frames in the list, and not necessarily the same as frame_num.
403+ """
404+ self ._frame_score = self ._calculate_frame_score (frame , timecode )
405+ if self ._frame_score is None :
406+ return []
407+
408+ above_threshold : bool = self ._frame_score >= self ._threshold
409+ # TODO: Need to fix FlashFilter so we can enforce min_scene_length. We should be able to
410+ # just return `self._flash_filter.filter(timecode, above_threshold)` here.
411+ if above_threshold :
412+ return [Event (type = EventType .CUT , time = timecode )]
413+ return []
414+
415+ def _detect_edges (self , lum : numpy .ndarray ) -> numpy .ndarray :
416+ """Detect edges using the luma channel of a frame.
417+
418+ Arguments:
419+ lum: 2D 8-bit image representing the luma channel of a frame.
420+
421+ Returns:
422+ 2D 8-bit image of the same size as the input, where pixels with values of 255
423+ represent edges, and all other pixels are 0.
424+ """
425+ # Initialize kernel.
426+ if self ._kernel is None :
427+ kernel_size = _estimated_kernel_size (lum .shape [1 ], lum .shape [0 ])
428+ self ._kernel = numpy .ones ((kernel_size , kernel_size ), numpy .uint8 )
429+
430+ # Estimate levels for thresholding.
431+ # TODO: Add config file entries for sigma, aperture/kernel size, etc.
432+ sigma : float = 1.0 / 3.0
433+ median = numpy .median (lum )
434+ low = int (max (0 , (1.0 - sigma ) * median ))
435+ high = int (min (255 , (1.0 + sigma ) * median ))
436+
437+ # Calculate edges using Canny algorithm, and reduce noise by dilating the edges.
438+ # This increases edge overlap leading to improved robustness against noise and slow
439+ # camera movement. Note that very large kernel sizes can negatively affect accuracy.
440+ edges = cv2 .Canny (lum , low , high )
441+ return cv2 .dilate (edges , self ._kernel )
442+
443+ @property
444+ def event_buffer_length (self ) -> int :
445+ return self ._flash_filter .max_behind
0 commit comments