diff --git a/docs/pose_format.utils.hand.rst b/docs/pose_format.utils.hand.rst new file mode 100644 index 0000000..27462e9 --- /dev/null +++ b/docs/pose_format.utils.hand.rst @@ -0,0 +1,7 @@ +pose\_format.utils.hand module +=============================== + +.. automodule:: pose_format.utils.hand + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/pose_format.utils.rst b/docs/pose_format.utils.rst index 42fccd4..12564d0 100644 --- a/docs/pose_format.utils.rst +++ b/docs/pose_format.utils.rst @@ -9,6 +9,7 @@ pose\_format.utils package :caption: pose_format.utils submodules: pose_format.utils.fast_math + pose_format.utils.hand pose_format.utils.normalization_3d pose_format.utils.openpose pose_format.utils.openpose_135 @@ -18,4 +19,4 @@ pose\_format.utils package pose_format.utils.reader_test pose_format.utils.siren pose_format.utils.pose_converter - pose_format.utils.holistic \ No newline at end of file + pose_format.utils.holistic diff --git a/src/python/pose_format/utils/hand.py b/src/python/pose_format/utils/hand.py new file mode 100644 index 0000000..4e6f1d3 --- /dev/null +++ b/src/python/pose_format/utils/hand.py @@ -0,0 +1,248 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import List, Literal, Optional, Tuple + +import numpy as np +import numpy.ma as ma + +from pose_format.pose import Pose +from pose_format.utils.generic import detect_known_pose_format + +HandSide = Literal["LEFT", "RIGHT"] +__all__ = ["estimate_active_hand"] + +HAND_POINTS = ( + "WRIST", + "INDEX_FINGER_MCP", + "MIDDLE_FINGER_MCP", + "RING_FINGER_MCP", + "PINKY_MCP", + "INDEX_FINGER_TIP", + "MIDDLE_FINGER_TIP", + "PINKY_TIP", +) + +SHORT_CLIP_FRAMES = 50 + + +@dataclass(frozen=True) +class _HandFeatures: + above_count: float + above_rate: float + dist_shoulder: float + hand_dist_center: float + hand_motion_sum: float + hand_conf: float + body_motion_sum: float + + +def estimate_active_hand(pose: Pose) -> HandSide: + """Estimate which hand is active in a MediaPipe holistic pose. + + The heuristic compares torso-normalized wrist geometry, hand landmark confidence, distance from the torso, + and motion. It uses a short/long clip split: short clips use body-relative wrist motion because summed motion is + still stable there, while longer clips emphasize tracked hand landmarks and avoid duration-sensitive body-motion + accumulation. + + This function was generated mostly using AI and was data-driven against a validation sample from the fsboard and + ChicagoFSWild datasets, where it achieved 100% accuracy, including mirrored-video validation. That validation + result is a dataset-specific check of this heuristic, not a guarantee for every pose distribution. + + The pose must use the ``holistic`` pose format, with components named ``POSE_LANDMARKS``, + ``LEFT_HAND_LANDMARKS``, and ``RIGHT_HAND_LANDMARKS``. Only the first tracked person is considered. + """ + known_pose_format = detect_known_pose_format(pose) + if known_pose_format != "holistic": + raise NotImplementedError( + f"Unsupported pose header schema {known_pose_format} for {estimate_active_hand.__name__}: {pose.header}" + ) + + if _is_short_clip(pose): + return _estimate_short_clip_hand(pose) + return _estimate_long_clip_hand(pose) + + +def _idx(pose: Pose, component: str, point: str) -> int: + return pose.header.get_point_index(component, point) + + +def _xy_and_conf(pose: Pose, indices: List[int]) -> Tuple[np.ndarray, np.ndarray]: + if pose.body.data.shape[1] == 0: + raise ValueError("Cannot estimate active hand for a pose with no people") + + xy = ma.filled(pose.body.data[:, 0, indices, :2], np.nan).astype(float) + conf = ma.filled(pose.body.confidence[:, 0, indices], 0.0).astype(float) + return np.where(conf[..., None] > 0, xy, np.nan), conf + + +def _median(values: np.ndarray, default: float = 0.0) -> float: + finite = np.isfinite(values) + if not np.any(finite): + return default + return float(np.nanmedian(values[finite])) + + +def _shoulder_scale(pose: Pose) -> float: + indices = [ + _idx(pose, "POSE_LANDMARKS", "LEFT_SHOULDER"), + _idx(pose, "POSE_LANDMARKS", "RIGHT_SHOULDER"), + ] + xy, conf = _xy_and_conf(pose, indices) + valid = np.all(conf > 0, axis=1) + distances = np.linalg.norm(xy[:, 0] - xy[:, 1], axis=1) + scale = _median(distances[valid], default=1.0) + return scale if scale > 1.0 else 1.0 + + +def _torso_center(pose: Pose) -> np.ndarray: + points = ("LEFT_SHOULDER", "RIGHT_SHOULDER", "LEFT_HIP", "RIGHT_HIP") + xy, _ = _xy_and_conf(pose, [_idx(pose, "POSE_LANDMARKS", point) for point in points]) + + valid = np.isfinite(xy).all(axis=2) + totals = np.nansum(np.where(valid[..., None], xy, 0.0), axis=1) + counts = np.sum(valid, axis=1) + + center = np.full((xy.shape[0], 2), np.nan, dtype=float) + good = counts > 0 + center[good] = totals[good] / counts[good, None] + return center + + +def _motion_sum(points: np.ndarray, conf: np.ndarray, center: np.ndarray, scale: float) -> float: + if points.shape[0] < 2: + return 0.0 + + relative = points - center[:, None, :] + speed = np.linalg.norm(np.diff(relative, axis=0), axis=-1) / scale + valid = (conf[1:] > 0) & (conf[:-1] > 0) + return float(np.nansum(np.where(valid, speed, 0.0))) + + +def _hand_features(pose: Pose, hand: HandSide) -> _HandFeatures: + scale = _shoulder_scale(pose) + center = _torso_center(pose) + + wrist_idx = _idx(pose, "POSE_LANDMARKS", f"{hand}_WRIST") + elbow_idx = _idx(pose, "POSE_LANDMARKS", f"{hand}_ELBOW") + shoulder_idx = _idx(pose, "POSE_LANDMARKS", f"{hand}_SHOULDER") + + body_xy, body_conf = _xy_and_conf(pose, [wrist_idx, elbow_idx, shoulder_idx]) + wrist = body_xy[:, 0] + elbow = body_xy[:, 1] + shoulder = body_xy[:, 2] + wrist_conf = body_conf[:, 0] + elbow_conf = body_conf[:, 1] + + visible_arm = (wrist_conf > 0) & (elbow_conf > 0) + above = visible_arm & (wrist[:, 1] < elbow[:, 1]) + above_count = float(np.sum(above)) + above_rate = above_count / float(np.sum(visible_arm)) if np.any(visible_arm) else 0.0 + + dist_shoulder = _median(np.linalg.norm(wrist - shoulder, axis=1)[wrist_conf > 0] / scale) + body_motion_sum = _motion_sum(wrist[:, None, :], wrist_conf[:, None], center, scale) + + hand_indices = [_idx(pose, f"{hand}_HAND_LANDMARKS", point) for point in HAND_POINTS] + hand_xy, hand_conf = _xy_and_conf(pose, hand_indices) + + hand_valid = hand_conf > 0 + hand_conf_mean = float(np.mean(hand_conf)) if hand_conf.size else 0.0 + + centroid = np.full((hand_xy.shape[0], 2), np.nan, dtype=float) + frame_counts = np.sum(hand_valid, axis=1) + frame_has_hand = frame_counts > 0 + centroid[frame_has_hand] = ( + np.nansum(np.where(hand_valid[..., None], hand_xy, 0.0), axis=1)[frame_has_hand] + / frame_counts[frame_has_hand, None] + ) + hand_dist_center = _median(np.linalg.norm(centroid - center, axis=1) / scale) + hand_motion_sum = _motion_sum(hand_xy, hand_conf, center, scale) + + return _HandFeatures( + above_count=above_count, + above_rate=above_rate, + dist_shoulder=dist_shoulder, + hand_dist_center=hand_dist_center, + hand_motion_sum=hand_motion_sum, + hand_conf=hand_conf_mean, + body_motion_sum=body_motion_sum, + ) + + +def _short_clip_score(features: _HandFeatures) -> float: + return ( + -features.dist_shoulder + + 0.05 * features.hand_motion_sum + + 2.0 * features.hand_conf + + 0.5 * features.hand_dist_center + + 0.2 * features.body_motion_sum + ) + + +def _long_clip_score(features: _HandFeatures) -> float: + return ( + -features.dist_shoulder + + 0.05 * features.hand_motion_sum + + 2.0 * features.hand_conf + + 0.5 * features.hand_dist_center + ) + + +def _estimate_short_clip_hand(pose: Pose) -> HandSide: + left = _hand_features(pose, "LEFT") + right = _hand_features(pose, "RIGHT") + left_score = _short_clip_score(left) + right_score = _short_clip_score(right) + + if ( + max(left.hand_conf, right.hand_conf) < 0.45 + and max(left.above_rate, right.above_rate) >= 0.9 + and abs(left.body_motion_sum - right.body_motion_sum) > 0.5 + ): + if left.body_motion_sum > right.body_motion_sum and left.above_rate >= 0.5: + return "LEFT" + if right.above_rate >= 0.5: + return "RIGHT" + + both_raised = abs(left.above_count - right.above_count) <= 1.0 and min(left.above_rate, right.above_rate) >= 0.8 + if both_raised and abs(left_score - right_score) < 0.6: + hand = _choose_extended_hand(left, right) + if hand is not None: + return hand + + return "LEFT" if left_score > right_score else "RIGHT" + + +def _estimate_long_clip_hand(pose: Pose) -> HandSide: + left = _hand_features(pose, "LEFT") + right = _hand_features(pose, "RIGHT") + left_score = _long_clip_score(left) + right_score = _long_clip_score(right) + + if left.above_rate >= 0.9 and right.above_rate <= 0.2 and left.hand_conf + 0.05 >= right.hand_conf: + return "LEFT" + if right.above_rate >= 0.9 and left.above_rate <= 0.2 and right.hand_conf + 0.05 >= left.hand_conf: + return "RIGHT" + + both_raised = abs(left.above_count - right.above_count) <= 1.0 and min(left.above_rate, right.above_rate) >= 0.8 + if both_raised and abs(left_score - right_score) < 0.6: + hand = _choose_extended_hand(left, right) + if hand is not None: + return hand + + return "LEFT" if left_score > right_score else "RIGHT" + + +def _choose_extended_hand(left: _HandFeatures, right: _HandFeatures) -> Optional[HandSide]: + if abs(left.hand_dist_center - right.hand_dist_center) > 0.4: + return "LEFT" if left.hand_dist_center > right.hand_dist_center else "RIGHT" + + if right.hand_conf - left.hand_conf >= 0.05 and right.hand_dist_center - left.hand_dist_center >= 0.1: + return "RIGHT" + if left.hand_conf - right.hand_conf >= 0.05 and left.hand_dist_center - right.hand_dist_center >= 0.1: + return "LEFT" + return None + + +def _is_short_clip(pose: Pose) -> bool: + return pose.body.data.shape[0] < SHORT_CLIP_FRAMES diff --git a/src/python/tests/data/hand_estimation/0ff215ef4f1f111e217e2cefee0adc77.pose b/src/python/tests/data/hand_estimation/0ff215ef4f1f111e217e2cefee0adc77.pose new file mode 100644 index 0000000..e1c8c81 Binary files /dev/null and b/src/python/tests/data/hand_estimation/0ff215ef4f1f111e217e2cefee0adc77.pose differ diff --git a/src/python/tests/data/hand_estimation/6fb01565da31c5500d1ef2cd2906b06b.pose b/src/python/tests/data/hand_estimation/6fb01565da31c5500d1ef2cd2906b06b.pose new file mode 100644 index 0000000..da09007 Binary files /dev/null and b/src/python/tests/data/hand_estimation/6fb01565da31c5500d1ef2cd2906b06b.pose differ diff --git a/src/python/tests/data/hand_estimation/7731febd6afbbe90f806a4434c282016.pose b/src/python/tests/data/hand_estimation/7731febd6afbbe90f806a4434c282016.pose new file mode 100644 index 0000000..3f35cb0 Binary files /dev/null and b/src/python/tests/data/hand_estimation/7731febd6afbbe90f806a4434c282016.pose differ diff --git a/src/python/tests/data/hand_estimation/7bd6dd48722def3cf84258b96371cb1e.pose b/src/python/tests/data/hand_estimation/7bd6dd48722def3cf84258b96371cb1e.pose new file mode 100644 index 0000000..fa5ea6f Binary files /dev/null and b/src/python/tests/data/hand_estimation/7bd6dd48722def3cf84258b96371cb1e.pose differ diff --git a/src/python/tests/data/hand_estimation/84322c38ff23406f222641da30d0a49b.pose b/src/python/tests/data/hand_estimation/84322c38ff23406f222641da30d0a49b.pose new file mode 100644 index 0000000..88b8577 Binary files /dev/null and b/src/python/tests/data/hand_estimation/84322c38ff23406f222641da30d0a49b.pose differ diff --git a/src/python/tests/hand_estimation_test.py b/src/python/tests/hand_estimation_test.py new file mode 100644 index 0000000..c66bf0f --- /dev/null +++ b/src/python/tests/hand_estimation_test.py @@ -0,0 +1,139 @@ +from pathlib import Path + +import numpy as np +import numpy.ma as ma +import pytest + +from pose_format.numpy import NumPyPoseBody +from pose_format.pose import Pose +from pose_format.pose_header import PoseHeader, PoseHeaderComponent, PoseHeaderDimensions +from pose_format.utils.generic import fake_openpose_pose +from pose_format.utils.hand import estimate_active_hand + +DATA_DIR = Path(__file__).parent / "data" / "hand_estimation" + +REAL_POSE_CASES = [ + ("84322c38ff23406f222641da30d0a49b.pose", "LEFT"), + ("7731febd6afbbe90f806a4434c282016.pose", "RIGHT"), + ("6fb01565da31c5500d1ef2cd2906b06b.pose", "RIGHT"), + ("7bd6dd48722def3cf84258b96371cb1e.pose", "LEFT"), + ("0ff215ef4f1f111e217e2cefee0adc77.pose", "RIGHT"), +] + +BODY_POINTS = [ + "LEFT_SHOULDER", + "RIGHT_SHOULDER", + "LEFT_HIP", + "RIGHT_HIP", + "LEFT_ELBOW", + "RIGHT_ELBOW", + "LEFT_WRIST", + "RIGHT_WRIST", +] + +HAND_POINTS = [ + "WRIST", + "INDEX_FINGER_MCP", + "MIDDLE_FINGER_MCP", + "RING_FINGER_MCP", + "PINKY_MCP", + "INDEX_FINGER_TIP", + "MIDDLE_FINGER_TIP", + "PINKY_TIP", +] + + +def _component(name, points): + return PoseHeaderComponent(name, points, limbs=[], colors=[], point_format="XYC") + + +def _pose_template(frames): + components = [ + _component("POSE_LANDMARKS", BODY_POINTS), + _component("LEFT_HAND_LANDMARKS", HAND_POINTS), + _component("RIGHT_HAND_LANDMARKS", HAND_POINTS), + ] + header = PoseHeader(version=0.2, dimensions=PoseHeaderDimensions(100, 100), components=components) + data = np.zeros((frames, 1, header.total_points(), 2), dtype=np.float32) + confidence = np.zeros((frames, 1, header.total_points()), dtype=np.float32) + pose = Pose(header, NumPyPoseBody(fps=25.0, data=ma.masked_array(data), confidence=confidence)) + + for frame in range(frames): + _set_body_point(pose, frame, "LEFT_SHOULDER", 40, 30) + _set_body_point(pose, frame, "RIGHT_SHOULDER", 60, 30) + _set_body_point(pose, frame, "LEFT_HIP", 42, 70) + _set_body_point(pose, frame, "RIGHT_HIP", 58, 70) + _set_body_point(pose, frame, "LEFT_ELBOW", 40, 45) + _set_body_point(pose, frame, "RIGHT_ELBOW", 60, 45) + _set_body_point(pose, frame, "LEFT_WRIST", 40, 55) + _set_body_point(pose, frame, "RIGHT_WRIST", 60, 55) + + return pose + + +def _set_body_point(pose, frame, point, x, y, conf=1.0): + index = pose.header.get_point_index("POSE_LANDMARKS", point) + pose.body.data[frame, 0, index, :2] = (x, y) + pose.body.confidence[frame, 0, index] = conf + + +def _set_hand_frame(pose, frame, hand, x, y, conf=1.0): + for point_no, point in enumerate(HAND_POINTS): + index = pose.header.get_point_index(f"{hand}_HAND_LANDMARKS", point) + pose.body.data[frame, 0, index, :2] = (x + point_no * 0.2, y) + pose.body.confidence[frame, 0, index] = conf + + +def _make_long_pose(active_hand): + pose = _pose_template(frames=60) + other_hand = "LEFT" if active_hand == "RIGHT" else "RIGHT" + + for frame in range(60): + _set_body_point(pose, frame, f"{active_hand}_WRIST", 60 if active_hand == "RIGHT" else 40, 35) + _set_body_point(pose, frame, f"{other_hand}_WRIST", 40 if active_hand == "RIGHT" else 60, 58) + _set_hand_frame(pose, frame, active_hand, 70 if active_hand == "RIGHT" else 30, 35, conf=1.0) + _set_hand_frame(pose, frame, other_hand, 50, 55, conf=0.1) + + return pose + + +def test_estimate_active_hand_uses_hand_landmarks_for_long_clips(): + assert estimate_active_hand(_make_long_pose("RIGHT")) == "RIGHT" + + +def test_estimate_active_hand_is_left_right_symmetric(): + assert estimate_active_hand(_make_long_pose("LEFT")) == "LEFT" + + +def test_estimate_active_hand_uses_motion_for_short_clips_with_weak_hand_tracking(): + pose = _pose_template(frames=12) + + for frame in range(12): + _set_body_point(pose, frame, "LEFT_WRIST", 32 + frame * 2, 35) + _set_body_point(pose, frame, "RIGHT_WRIST", 60, 35) + _set_hand_frame(pose, frame, "LEFT", 40, 35, conf=0.0) + _set_hand_frame(pose, frame, "RIGHT", 60, 35, conf=0.0) + + assert estimate_active_hand(pose) == "LEFT" + + +def test_estimate_active_hand_rejects_non_holistic_poses(): + with pytest.raises(NotImplementedError, match="Unsupported pose header schema openpose"): + estimate_active_hand(fake_openpose_pose(num_frames=2)) + + +@pytest.mark.parametrize("filename, expected_hand", REAL_POSE_CASES) +def test_estimate_active_hand_on_real_holistic_fixtures(filename, expected_hand): + pose = Pose.read((DATA_DIR / filename).read_bytes()) + + assert estimate_active_hand(pose) == expected_hand + + +@pytest.mark.parametrize("filename, expected_hand", REAL_POSE_CASES) +def test_estimate_active_hand_flips_after_horizontal_mirror(filename, expected_hand): + from pose_format.utils.holistic import mirror_horizontal + + opposite_hand = "LEFT" if expected_hand == "RIGHT" else "RIGHT" + pose = Pose.read((DATA_DIR / filename).read_bytes()) + + assert estimate_active_hand(mirror_horizontal(pose)) == opposite_hand