@@ -301,7 +301,7 @@ def test_ttl_expiration(self, db, cf):
301301 txn .commit ()
302302
303303 cf .flush_memtable ()
304- time .sleep (0.5 )
304+ time .sleep (2 )
305305
306306 with db .begin_txn () as txn :
307307 try :
@@ -711,5 +711,181 @@ def test_load_preserves_all_fields(self, temp_db_path):
711711 assert loaded .l0_queue_stall_threshold == original .l0_queue_stall_threshold
712712
713713
714+ class TestCommitHook :
715+ """Tests for commit hook (change data capture) API."""
716+
717+ def test_hook_fires_on_commit (self , db , cf ):
718+ """Test that the commit hook fires when a transaction commits."""
719+ captured = []
720+
721+ def hook (ops , commit_seq ):
722+ captured .append ({"ops" : ops , "seq" : commit_seq })
723+ return 0
724+
725+ cf .set_commit_hook (hook )
726+
727+ with db .begin_txn () as txn :
728+ txn .put (cf , b"key1" , b"value1" )
729+ txn .commit ()
730+
731+ assert len (captured ) == 1
732+ assert len (captured [0 ]["ops" ]) == 1
733+ assert captured [0 ]["ops" ][0 ].key == b"key1"
734+ assert captured [0 ]["ops" ][0 ].value == b"value1"
735+ assert captured [0 ]["ops" ][0 ].is_delete is False
736+ assert captured [0 ]["seq" ] > 0
737+
738+ cf .clear_commit_hook ()
739+
740+ def test_hook_captures_deletes (self , db , cf ):
741+ """Test that the hook correctly reports delete operations."""
742+ captured = []
743+
744+ with db .begin_txn () as txn :
745+ txn .put (cf , b"del_key" , b"del_val" )
746+ txn .commit ()
747+
748+ def hook (ops , commit_seq ):
749+ captured .append (ops )
750+ return 0
751+
752+ cf .set_commit_hook (hook )
753+
754+ with db .begin_txn () as txn :
755+ txn .delete (cf , b"del_key" )
756+ txn .commit ()
757+
758+ assert len (captured ) == 1
759+ delete_ops = [op for op in captured [0 ] if op .is_delete ]
760+ assert len (delete_ops ) >= 1
761+ assert delete_ops [0 ].key == b"del_key"
762+ assert delete_ops [0 ].value is None
763+
764+ cf .clear_commit_hook ()
765+
766+ def test_hook_multi_op_batch (self , db , cf ):
767+ """Test that the hook receives all operations in a batch."""
768+ captured = []
769+
770+ def hook (ops , commit_seq ):
771+ captured .append (ops )
772+ return 0
773+
774+ cf .set_commit_hook (hook )
775+
776+ with db .begin_txn () as txn :
777+ txn .put (cf , b"batch1" , b"v1" )
778+ txn .put (cf , b"batch2" , b"v2" )
779+ txn .put (cf , b"batch3" , b"v3" )
780+ txn .commit ()
781+
782+ assert len (captured ) == 1
783+ assert len (captured [0 ]) == 3
784+ keys = {op .key for op in captured [0 ]}
785+ assert keys == {b"batch1" , b"batch2" , b"batch3" }
786+
787+ cf .clear_commit_hook ()
788+
789+ def test_hook_commit_seq_increases (self , db , cf ):
790+ """Test that commit_seq is monotonically increasing."""
791+ seqs = []
792+
793+ def hook (ops , commit_seq ):
794+ seqs .append (commit_seq )
795+ return 0
796+
797+ cf .set_commit_hook (hook )
798+
799+ for i in range (3 ):
800+ with db .begin_txn () as txn :
801+ txn .put (cf , f"seq_key_{ i } " .encode (), f"seq_val_{ i } " .encode ())
802+ txn .commit ()
803+
804+ assert len (seqs ) == 3
805+ assert seqs [0 ] < seqs [1 ] < seqs [2 ]
806+
807+ cf .clear_commit_hook ()
808+
809+ def test_clear_hook_stops_firing (self , db , cf ):
810+ """Test that clearing the hook stops callbacks."""
811+ captured = []
812+
813+ def hook (ops , commit_seq ):
814+ captured .append (ops )
815+ return 0
816+
817+ cf .set_commit_hook (hook )
818+
819+ with db .begin_txn () as txn :
820+ txn .put (cf , b"before_clear" , b"v" )
821+ txn .commit ()
822+
823+ assert len (captured ) == 1
824+
825+ cf .clear_commit_hook ()
826+
827+ with db .begin_txn () as txn :
828+ txn .put (cf , b"after_clear" , b"v" )
829+ txn .commit ()
830+
831+ # No new captures after clearing
832+ assert len (captured ) == 1
833+
834+ def test_hook_failure_does_not_rollback (self , db , cf ):
835+ """Test that a hook returning non-zero does not affect the commit."""
836+ def failing_hook (ops , commit_seq ):
837+ return - 1 # Simulate failure
838+
839+ cf .set_commit_hook (failing_hook )
840+
841+ with db .begin_txn () as txn :
842+ txn .put (cf , b"survive" , b"value" )
843+ txn .commit ()
844+
845+ cf .clear_commit_hook ()
846+
847+ # Data should still be committed despite hook failure
848+ with db .begin_txn () as txn :
849+ assert txn .get (cf , b"survive" ) == b"value"
850+
851+ def test_hook_with_ttl (self , db , cf ):
852+ """Test that the hook reports TTL values."""
853+ captured = []
854+
855+ def hook (ops , commit_seq ):
856+ captured .append (ops )
857+ return 0
858+
859+ cf .set_commit_hook (hook )
860+
861+ ttl_val = int (time .time ()) + 3600 # 1 hour from now
862+ with db .begin_txn () as txn :
863+ txn .put (cf , b"ttl_key" , b"ttl_val" , ttl = ttl_val )
864+ txn .commit ()
865+
866+ assert len (captured ) == 1
867+ assert captured [0 ][0 ].ttl == ttl_val
868+
869+ cf .clear_commit_hook ()
870+
871+ def test_hook_exception_handled (self , db , cf ):
872+ """Test that Python exceptions in the hook don't crash the process."""
873+ def crashing_hook (ops , commit_seq ):
874+ raise RuntimeError ("hook crashed!" )
875+
876+ cf .set_commit_hook (crashing_hook )
877+
878+ # Should not raise - exception is caught internally
879+ with db .begin_txn () as txn :
880+ txn .put (cf , b"crash_key" , b"crash_val" )
881+ txn .commit ()
882+
883+ cf .clear_commit_hook ()
884+
885+ # Data should still be committed
886+ with db .begin_txn () as txn :
887+ assert txn .get (cf , b"crash_key" ) == b"crash_val"
888+
889+
714890if __name__ == "__main__" :
715891 pytest .main ([__file__ , "-v" ])
0 commit comments