@@ -541,3 +541,50 @@ def test_table_comment_reflection(self, inspector: Inspector, table: Table):
541541 def test_column_comment (self , inspector : Inspector , table : Table ):
542542 result = inspector .get_columns (table .name )[0 ].get ("comment" )
543543 assert result == "column comment"
544+
545+
546+ def test_pool_pre_ping_with_closed_connection (connection_details ):
547+ """Test that pool_pre_ping detects closed connections and creates new ones.
548+
549+ When a pooled connection is closed (simulating session expiration),
550+ do_ping() detects it and SQLAlchemy creates a new connection.
551+ """
552+ conn_string , connect_args = version_agnostic_connect_arguments (connection_details )
553+
554+ engine = create_engine (
555+ conn_string ,
556+ connect_args = connect_args ,
557+ pool_pre_ping = True ,
558+ pool_size = 1 ,
559+ max_overflow = 0 ,
560+ )
561+
562+ # Step 1: Use a connection and record its session ID
563+ with engine .connect () as conn :
564+ result = conn .execute (text ("SELECT VERSION()" )).scalar ()
565+ assert result is not None
566+
567+ raw_conn = conn .connection .dbapi_connection
568+ session_id_1 = raw_conn .get_session_id_hex ()
569+ assert session_id_1 is not None
570+
571+ # Step 2: Close the pooled connection to simulate session expiration
572+ pooled_conn = engine .pool ._pool .queue [0 ]
573+ pooled_conn .driver_connection .close ()
574+ assert not pooled_conn .driver_connection .open
575+
576+ # Step 3: pool_pre_ping should detect the dead connection and create a new one
577+ with engine .connect () as conn :
578+ result = conn .execute (text ("SELECT VERSION()" )).scalar ()
579+ assert result is not None
580+
581+ raw_conn = conn .connection .dbapi_connection
582+ session_id_2 = raw_conn .get_session_id_hex ()
583+ assert session_id_2 is not None
584+
585+ assert session_id_1 != session_id_2 , (
586+ "pool_pre_ping should have detected the closed connection "
587+ "and created a new one with a different session ID"
588+ )
589+
590+ engine .dispose ()
0 commit comments