From 81b02813836a396aeb4ede0f20ebf8aecb9bec19 Mon Sep 17 00:00:00 2001 From: yujun Date: Fri, 29 May 2026 11:52:24 +0800 Subject: [PATCH] [fix](session) Reset session state for COM_RESET_CONNECTION COM_RESET_CONNECTION should clear session-scoped state without losing user-level default session settings such as query and insert timeout. Key changes: - Reset session variables, user variables, prepared statements, catalog/database state, running query, and return rows on COM_RESET_CONNECTION. - Reapply user default query_timeout and insert_timeout after rebuilding SessionVariable. - Add a regression helper to send COM_RESET_CONNECTION directly through the MySQL protocol. Unit Test: - ConnectContextTest - test_reset_connection_session_variable --- .../org/apache/doris/qe/ConnectContext.java | 46 +++++++++++++++++-- .../org/apache/doris/qe/ConnectProcessor.java | 4 +- .../apache/doris/qe/ConnectContextTest.java | 46 +++++++++++++++++++ .../regression/suite/SuiteContext.groovy | 16 +++++++ ...t_reset_connection_session_variable.groovy | 28 +++++++++++ 5 files changed, 133 insertions(+), 7 deletions(-) create mode 100644 regression-test/suites/query_p0/session_variable/test_reset_connection_session_variable.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 3cf9b29c16b096..062f12fa28215f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -56,6 +56,7 @@ import org.apache.doris.mysql.MysqlHandshakePacket; import org.apache.doris.mysql.MysqlSslContext; import org.apache.doris.mysql.ProxyMysqlChannel; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.stats.StatsErrorEstimator; @@ -369,6 +370,46 @@ public void clearLastDBOfCatalog() { lastDBOfCatalog.clear(); } + public void resetConnection() { + closeTxn(); + if (!dbToTempTableNamesMap.isEmpty()) { + deleteTempTable(); + } + dbToTempTableNamesMap.clear(); + resetSessionVariable(); + userVars = new HashMap<>(); + preparedQuerys.clear(); + preparedStatementContextMap.clear(); + preparedStmtId = Integer.MIN_VALUE; + runningQuery = null; + changeDefaultCatalog(InternalCatalog.INTERNAL_CATALOG_NAME); + clearLastDBOfCatalog(); + command = MysqlCommand.COM_SLEEP; + returnRows = 0; + } + + private void resetSessionVariable() { + sessionVariable = VariableMgr.newSessionVariable(); + applyUserSessionVariableDefaults(); + if (Config.use_fuzzy_session_variable) { + sessionVariable.initFuzzyModeVariables(); + } + } + + private void applyUserSessionVariableDefaults() { + String qualifiedUser = getQualifiedUser(); + if (Strings.isNullOrEmpty(qualifiedUser)) { + return; + } + Env currentEnv = env == null ? Env.getCurrentEnv() : env; + Auth auth = currentEnv == null ? null : currentEnv.getAuth(); + if (auth == null) { + return; + } + setUserQueryTimeout(auth.getQueryTimeout(qualifiedUser)); + setUserInsertTimeout(auth.getInsertTimeout(qualifiedUser)); + } + public void setNotEvalNondeterministicFunction(boolean notEvalNondeterministicFunction) { this.notEvalNondeterministicFunction = notEvalNondeterministicFunction; } @@ -385,12 +426,9 @@ public void init() { state = new QueryState(); returnRows = 0; isKilled = false; - sessionVariable = VariableMgr.newSessionVariable(); + resetSessionVariable(); userVars = new HashMap<>(); command = MysqlCommand.COM_SLEEP; - if (Config.use_fuzzy_session_variable) { - sessionVariable.initFuzzyModeVariables(); - } sessionId = UUID.randomUUID().toString(); if (!FeConstants.runningUnitTest) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 8cd734ceda89c2..43328b22006dda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -50,7 +50,6 @@ import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; @@ -155,8 +154,7 @@ protected void handleDebug() { } protected void handleResetConnection() { - ctx.changeDefaultCatalog(InternalCatalog.INTERNAL_CATALOG_NAME); - ctx.clearLastDBOfCatalog(); + ctx.resetConnection(); ctx.getState().setOk(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java index 090adbb608991a..91c67ffb8e31e9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java @@ -18,6 +18,8 @@ package org.apache.doris.qe; import org.apache.doris.analysis.ResourceTypeEnum; +import org.apache.doris.analysis.SetVar; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; import org.apache.doris.cloud.qe.ComputeGroupException; @@ -73,6 +75,50 @@ public void setUp() throws Exception { Mockito.when(catalogMgr.getCatalog(Mockito.anyString())).thenReturn(internalCatalog); } + @Test + public void testResetConnectionClearsSessionState() throws Exception { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(env); + ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp("testUser", "%")); + Mockito.when(env.getAuth()).thenReturn(auth); + Mockito.when(auth.getQueryTimeout("testUser")).thenReturn(123); + Mockito.when(auth.getInsertTimeout("testUser")).thenReturn(456); + ctx.setUserQueryTimeout(123); + ctx.setUserInsertTimeout(456); + VariableMgr.setVar(ctx.getSessionVariable(), + new SetVar(SessionVariable.SQL_SELECT_LIMIT, new StringLiteral("0"))); + ctx.getSessionVariable().setQueryTimeoutS(1); + ctx.getSessionVariable().setInsertTimeoutS(2); + ctx.setUserVar("user_var", new StringLiteral("value")); + ctx.changeDefaultCatalog("external_catalog"); + ctx.currentDb = "test_db"; + ctx.currentDbId = 10; + ctx.addLastDBOfCatalog("external_catalog", "test_db"); + ctx.addPreparedQuery("1", "select 1"); + ctx.setRunningQuery("select 1"); + ctx.setCommand(MysqlCommand.COM_QUERY); + ctx.updateReturnRows(10); + + Assert.assertEquals(0, ctx.getSessionVariable().getSqlSelectLimit()); + Assert.assertEquals(1, ctx.getSessionVariable().getQueryTimeoutS()); + Assert.assertEquals(2, ctx.getSessionVariable().getInsertTimeoutS()); + Assert.assertFalse(ctx.getUserVars().isEmpty()); + + ctx.resetConnection(); + + Assert.assertEquals(-1, ctx.getSessionVariable().getSqlSelectLimit()); + Assert.assertEquals(123, ctx.getSessionVariable().getQueryTimeoutS()); + Assert.assertEquals(456, ctx.getSessionVariable().getInsertTimeoutS()); + Assert.assertTrue(ctx.getUserVars().isEmpty()); + Assert.assertEquals(InternalCatalog.INTERNAL_CATALOG_NAME, ctx.getDefaultCatalog()); + Assert.assertEquals("", ctx.getDatabase()); + Assert.assertNull(ctx.getLastDBOfCatalog("external_catalog")); + Assert.assertNull(ctx.getPreparedQuery("1")); + Assert.assertNull(ctx.getRunningQuery()); + Assert.assertEquals(MysqlCommand.COM_SLEEP, ctx.getCommand()); + Assert.assertEquals(0, ctx.getReturnRows()); + } + @Test public void testNormal() { try (MockedStatic mockedEnv = Mockito.mockStatic(Env.class)) { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy index 545b388c5a34e7..0d599aed817ae6 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy @@ -18,6 +18,10 @@ package org.apache.doris.regression.suite import com.google.common.collect.Maps +import com.mysql.cj.NativeSession +import com.mysql.cj.jdbc.JdbcConnection +import com.mysql.cj.protocol.a.NativeConstants +import com.mysql.cj.protocol.a.NativePacketPayload import groovy.transform.CompileStatic import org.apache.doris.regression.Config import org.apache.doris.regression.util.OutputUtils @@ -437,6 +441,18 @@ class SuiteContext implements Closeable { connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username, connInfo.password); } + public void resetConnection() { + ConnectionInfo connInfo = threadLocalConn.get() + if (connInfo == null) { + return + } + NativeSession session = (NativeSession) connInfo.conn.unwrap(JdbcConnection.class).getSession() + // COM_RESET_CONNECTION has no payload besides the command byte. + NativePacketPayload packet = new NativePacketPayload(1) + packet.writeInteger(NativeConstants.IntegerDataType.INT1, 0x1f) + session.sendCommand(packet, false, 0) + } + public void connectTo(String url, String username, String password) { ConnectionInfo oldConn = threadLocalConn.get() if (oldConn != null) { diff --git a/regression-test/suites/query_p0/session_variable/test_reset_connection_session_variable.groovy b/regression-test/suites/query_p0/session_variable/test_reset_connection_session_variable.groovy new file mode 100644 index 00000000000000..a45138fc95aaeb --- /dev/null +++ b/regression-test/suites/query_p0/session_variable/test_reset_connection_session_variable.groovy @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_reset_connection_session_variable", "p0") { + sql "set sql_select_limit = 0" + + def limitedResult = sql "select 1 union all select 2" + assertEquals(0, limitedResult.size()) + + resetConnection() + + def resetResult = sql "select 1 union all select 2" + assertEquals(2, resetResult.size()) +}