From dc359645c90d0dc3c45a09033db13ebfae62e94d Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Mon, 13 Apr 2026 17:23:50 +0800 Subject: [PATCH 1/4] update again (#8350) Issue Number: close #xxx --- .../org/apache/doris/common/jni/utils/ExpiringMap.java | 10 +++++++++- .../main/java/org/apache/doris/udf/BaseExecutor.java | 4 +++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java index 3496d5bbb63eb6..481ec833f9049e 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java @@ -42,11 +42,14 @@ public void put(K key, V value, long expirationTimeMs) { map.put(key, value); expirationMap.put(key, expirationTime); ttlMap.put(key, expirationTimeMs); + LOG.info("ExpiringMap put key=" + key + ", ttlMs=" + expirationTimeMs + + ", thread=" + Thread.currentThread().getName()); } public V get(K key) { Long expirationTime = expirationMap.get(key); if (expirationTime == null || System.currentTimeMillis() > expirationTime) { + LOG.info("ExpiringMap expired key=" + key + ", thread=" + Thread.currentThread().getName()); remove(key); return null; } @@ -54,6 +57,9 @@ public V get(K key) { long ttl = ttlMap.get(key); long newExpirationTime = System.currentTimeMillis() + ttl; expirationMap.put(key, newExpirationTime); + if (LOG.isDebugEnabled()) { + LOG.debug("ExpiringMap hit key=" + key + ", thread=" + Thread.currentThread().getName()); + } return map.get(key); } @@ -65,7 +71,7 @@ private void startExpirationTask() { remove(key); } } - }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES); + }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MILLISECONDS); } public void remove(K key) { @@ -73,6 +79,8 @@ public void remove(K key) { expirationMap.remove(key); ttlMap.remove(key); + LOG.info("ExpiringMap remove key=" + key + ", hasValue=" + (value != null) + + ", thread=" + Thread.currentThread().getName()); // Uniformly release resources for any AutoCloseable value, if (value instanceof AutoCloseable) { try { diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java index 0967e9fde0bd07..1a3ee487ca248d 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java @@ -143,7 +143,9 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira classLoader = cache.classLoader; } } - if (cache == null) { + // Rebuild if cache is missing or if the cached classLoader was closed by a concurrent + // ExpiringMap expiration (cache != null but classLoader == null). + if (cache == null || cache.classLoader == null) { ClassLoader loader; if (Strings.isNullOrEmpty(jarPath)) { // if jarPath is empty, which means the UDF jar is located in custom_lib From 6ed9ddaa800f6e1470788da571452f4a0add8f65 Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Mon, 20 Apr 2026 11:27:52 +0800 Subject: [PATCH 2/4] update remove close (#8481) ## Proposed changes Issue Number: close #xxx --- .../apache/doris/common/jni/utils/ExpiringMap.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java index 481ec833f9049e..07e017391e7ee1 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java @@ -81,14 +81,10 @@ public void remove(K key) { LOG.info("ExpiringMap remove key=" + key + ", hasValue=" + (value != null) + ", thread=" + Thread.currentThread().getName()); - // Uniformly release resources for any AutoCloseable value, - if (value instanceof AutoCloseable) { - try { - ((AutoCloseable) value).close(); - } catch (Exception e) { - LOG.warn("Failed to close cached resource: " + key, e); - } - } + // Do NOT call close() on eviction. The value (e.g., UdfClassCache holding a URLClassLoader) + // may still be in use by concurrent threads. Closing the URLClassLoader while another thread + // is still loading classes from it causes NoClassDefFoundError. + // Instead, let the value be garbage collected naturally when no references remain. } public int size() { From a1f51131398cb5d208d4d08d1ac36f384731787f Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Tue, 28 Apr 2026 21:03:03 +0800 Subject: [PATCH 3/4] remove expirationTime --- .../common/classloader/ScannerLoader.java | 56 ++++++++-- .../doris/common/jni/utils/ExpiringMap.java | 104 ------------------ 2 files changed, 48 insertions(+), 112 deletions(-) delete mode 100644 fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index 1c5874031d49b3..a09c1610d9e125 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -17,7 +17,6 @@ package org.apache.doris.common.classloader; -import org.apache.doris.common.jni.utils.ExpiringMap; import org.apache.doris.common.jni.utils.UdfClassCache; import com.google.common.collect.Streams; @@ -37,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.stream.Collectors; @@ -88,7 +88,16 @@ public class ScannerLoader { public static final Logger LOG = LogManager.getLogger(ScannerLoader.class); private static final Map> loadedClasses = new HashMap<>(); - private static final ExpiringMap udfLoadedClasses = new ExpiringMap<>(); + // Cache of UDF class metadata (including the URLClassLoader used to load the UDF). + // Entries are inserted on first use and only ever removed by an explicit + // cleanUdfClassLoader() call (triggered by FE on DROP FUNCTION). There is intentionally + // no time-based eviction: that previously caused two issues — + // 1) closing a URLClassLoader while another thread was still loading classes from it + // led to NoClassDefFoundError; + // 2) rebuilding a fresh URLClassLoader on every eviction produced multiple coexisting + // ClassLoaders for the same UDF, which broke lazy class resolution and reflective + // lookups inside user UDF code. + private static final Map udfLoadedClasses = new ConcurrentHashMap<>(); private static final String CLASS_SUFFIX = ".class"; private static final String LOAD_PACKAGE = "org.apache.doris"; @@ -116,15 +125,46 @@ public static UdfClassCache getUdfClassLoader(String functionSignature) { return udfLoadedClasses.get(functionSignature); } - public static synchronized void cacheClassLoader(String functionSignature, UdfClassCache classCache, + /** + * Cache the UDF class metadata for the given function signature. + * The {@code expirationTime} parameter is kept for backward compatibility with the + * existing call sites and DDL property {@code expiration_time}, but is no longer used: + * cached entries are not evicted by time. Removal happens only via + * {@link #cleanUdfClassLoader(String)} on DROP FUNCTION. + */ + public static void cacheClassLoader(String functionSignature, UdfClassCache classCache, long expirationTime) { - LOG.info("Cache UDF for: {}", functionSignature); - udfLoadedClasses.put(functionSignature, classCache, expirationTime * 60 * 1000L); + LOG.info("Cache UDF for: " + functionSignature); + UdfClassCache previous = udfLoadedClasses.put(functionSignature, classCache); + if (previous != null && previous != classCache) { + // A previous entry existed; close it now to avoid leaking the URLClassLoader. + // No live executor should still be holding it because the cache miss path that + // led us here only fires when getUdfClassLoader() returned null — which only + // happens after an explicit cleanUdfClassLoader() removed the previous entry. + // Defensive close in case callers race. + try { + previous.close(); + } catch (Exception e) { + LOG.warn("Failed to close previous UdfClassCache for " + functionSignature, e); + } + } } - public synchronized void cleanUdfClassLoader(String functionSignature) { - LOG.info("cleanUdfClassLoader for: {}", functionSignature); - udfLoadedClasses.remove(functionSignature); + public void cleanUdfClassLoader(String functionSignature) { + LOG.info("cleanUdfClassLoader for: " + functionSignature); + UdfClassCache removed = udfLoadedClasses.remove(functionSignature); + if (removed != null) { + // Immediately close the URLClassLoader. NOTE: any in-flight query still holding a + // reference to this cache (e.g. via JNIContext.executor) will fail with + // NoClassDefFoundError on lazy class resolution after this point. This is the + // accepted semantic of DROP FUNCTION: the function is gone, queries against it + // are expected to fail. + try { + removed.close(); + } catch (Exception e) { + LOG.warn("Failed to close UdfClassCache for " + functionSignature, e); + } + } } /** diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java deleted file mode 100644 index 07e017391e7ee1..00000000000000 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.jni.utils; - -import org.apache.log4j.Logger; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ExpiringMap { - private final ConcurrentHashMap map = new ConcurrentHashMap<>(); // key --> value - private final ConcurrentHashMap ttlMap = new ConcurrentHashMap<>(); // key --> ttl interval - // key --> expirationTime(ttl interval + currentTimeMillis) - private final ConcurrentHashMap expirationMap = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private static final long DEFAULT_INTERVAL_TIME = 10 * 60 * 1000L; // 10 minutes - public static final Logger LOG = Logger.getLogger(ExpiringMap.class); - - public ExpiringMap() { - startExpirationTask(); - } - - public void put(K key, V value, long expirationTimeMs) { - long expirationTime = System.currentTimeMillis() + expirationTimeMs; - map.put(key, value); - expirationMap.put(key, expirationTime); - ttlMap.put(key, expirationTimeMs); - LOG.info("ExpiringMap put key=" + key + ", ttlMs=" + expirationTimeMs - + ", thread=" + Thread.currentThread().getName()); - } - - public V get(K key) { - Long expirationTime = expirationMap.get(key); - if (expirationTime == null || System.currentTimeMillis() > expirationTime) { - LOG.info("ExpiringMap expired key=" + key + ", thread=" + Thread.currentThread().getName()); - remove(key); - return null; - } - // reset time again - long ttl = ttlMap.get(key); - long newExpirationTime = System.currentTimeMillis() + ttl; - expirationMap.put(key, newExpirationTime); - if (LOG.isDebugEnabled()) { - LOG.debug("ExpiringMap hit key=" + key + ", thread=" + Thread.currentThread().getName()); - } - return map.get(key); - } - - private void startExpirationTask() { - scheduler.scheduleAtFixedRate(() -> { - long now = System.currentTimeMillis(); - for (K key : expirationMap.keySet()) { - if (expirationMap.get(key) <= now) { - remove(key); - } - } - }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MILLISECONDS); - } - - public void remove(K key) { - V value = map.remove(key); - expirationMap.remove(key); - ttlMap.remove(key); - - LOG.info("ExpiringMap remove key=" + key + ", hasValue=" + (value != null) - + ", thread=" + Thread.currentThread().getName()); - // Do NOT call close() on eviction. The value (e.g., UdfClassCache holding a URLClassLoader) - // may still be in use by concurrent threads. Closing the URLClassLoader while another thread - // is still loading classes from it causes NoClassDefFoundError. - // Instead, let the value be garbage collected naturally when no references remain. - } - - public int size() { - return map.size(); - } - - public void shutdown() { - scheduler.shutdown(); - try { - if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { - scheduler.shutdownNow(); - } - } catch (InterruptedException e) { - scheduler.shutdownNow(); - } - } -} From 599f7b7cd3e0ac61b8ab16bfd113d28c9cdd8a4e Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Fri, 29 May 2026 16:28:15 +0800 Subject: [PATCH 4/4] update --- .../common/classloader/ScannerLoader.java | 46 +++++++++++++------ .../org/apache/doris/udf/BaseExecutor.java | 21 ++++++--- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index a09c1610d9e125..f8a119efaa9ba4 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -97,6 +97,11 @@ public class ScannerLoader { // 2) rebuilding a fresh URLClassLoader on every eviction produced multiple coexisting // ClassLoaders for the same UDF, which broke lazy class resolution and reflective // lookups inside user UDF code. + // NOTE: a cache miss in BaseExecutor.getClassCache() is NOT only reachable after + // cleanUdfClassLoader() — concurrent first-time loads of the same signature can also + // both observe a miss. cacheClassLoader() must therefore insert atomically via + // putIfAbsent and must never close a cache that was already published to the map, + // because another executor may already be holding it. private static final Map udfLoadedClasses = new ConcurrentHashMap<>(); private static final String CLASS_SUFFIX = ".class"; private static final String LOAD_PACKAGE = "org.apache.doris"; @@ -127,27 +132,38 @@ public static UdfClassCache getUdfClassLoader(String functionSignature) { /** * Cache the UDF class metadata for the given function signature. - * The {@code expirationTime} parameter is kept for backward compatibility with the + * + *

Insertion is atomic via {@link Map#putIfAbsent}: if another executor thread has + * already published a cache entry for {@code functionSignature}, the {@code classCache} + * argument is treated as a redundant build and closed here (it has not yet been handed + * to any executor, so closing its URLClassLoader is safe). The already-published entry + * is returned to the caller so the current executor can switch to it.

+ * + *

The {@code expirationTime} parameter is kept for backward compatibility with the * existing call sites and DDL property {@code expiration_time}, but is no longer used: * cached entries are not evicted by time. Removal happens only via - * {@link #cleanUdfClassLoader(String)} on DROP FUNCTION. + * {@link #cleanUdfClassLoader(String)} on DROP FUNCTION.

+ * + * @return the {@link UdfClassCache} actually held in the map after this call — + * either {@code classCache} (we won the race) or the pre-existing entry + * (another thread won; {@code classCache} has been closed and must not be used). */ - public static void cacheClassLoader(String functionSignature, UdfClassCache classCache, + public static UdfClassCache cacheClassLoader(String functionSignature, UdfClassCache classCache, long expirationTime) { LOG.info("Cache UDF for: " + functionSignature); - UdfClassCache previous = udfLoadedClasses.put(functionSignature, classCache); - if (previous != null && previous != classCache) { - // A previous entry existed; close it now to avoid leaking the URLClassLoader. - // No live executor should still be holding it because the cache miss path that - // led us here only fires when getUdfClassLoader() returned null — which only - // happens after an explicit cleanUdfClassLoader() removed the previous entry. - // Defensive close in case callers race. - try { - previous.close(); - } catch (Exception e) { - LOG.warn("Failed to close previous UdfClassCache for " + functionSignature, e); - } + UdfClassCache existing = udfLoadedClasses.putIfAbsent(functionSignature, classCache); + if (existing == null) { + return classCache; + } + // Lost the race against a concurrent first-time load. The cache we just built has + // never been exposed to any executor, so closing its URLClassLoader here cannot + // affect anyone. Do NOT touch `existing` — another executor may already be using it. + try { + classCache.close(); + } catch (Exception e) { + LOG.warn("Failed to close redundant UdfClassCache for " + functionSignature, e); } + return existing; } public void cleanUdfClassLoader(String functionSignature) { diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java index 1a3ee487ca248d..6356576baaf573 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java @@ -138,14 +138,16 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira UdfClassCache cache = null; if (isStaticLoad) { cache = ScannerLoader.getUdfClassLoader(signature); - if (cache != null && cache.classLoader != null) { - // Reuse the cached classLoader to ensure dependent classes can be loaded + if (cache != null) { + // Reuse the cached classLoader to ensure dependent classes can be loaded. + // NOTE: cache.classLoader may be null when the UDF was originally loaded via + // the system class loader (jarPath empty / custom_lib UDF); see + // UdfClassCache#classLoader. A null value here is a valid cached state and + // must NOT trigger a rebuild — only an actual cache miss does. classLoader = cache.classLoader; } } - // Rebuild if cache is missing or if the cached classLoader was closed by a concurrent - // ExpiringMap expiration (cache != null but classLoader == null). - if (cache == null || cache.classLoader == null) { + if (cache == null) { ClassLoader loader; if (Strings.isNullOrEmpty(jarPath)) { // if jarPath is empty, which means the UDF jar is located in custom_lib @@ -164,7 +166,14 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira cache.classLoader = classLoader; checkAndCacheUdfClass(cache, funcRetType, parameterTypes); if (isStaticLoad) { - ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + UdfClassCache effective = ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + if (effective != cache) { + // Another thread won the publish race. Our locally-built cache (and its + // URLClassLoader) was already closed inside cacheClassLoader(); switch to + // the published one so we share its live classLoader. + cache = effective; + classLoader = cache.classLoader; + } } } return cache;