diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index 1c5874031d49b3..f8a119efaa9ba4 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -17,7 +17,6 @@ package org.apache.doris.common.classloader; -import org.apache.doris.common.jni.utils.ExpiringMap; import org.apache.doris.common.jni.utils.UdfClassCache; import com.google.common.collect.Streams; @@ -37,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.stream.Collectors; @@ -88,7 +88,21 @@ public class ScannerLoader { public static final Logger LOG = LogManager.getLogger(ScannerLoader.class); private static final Map> loadedClasses = new HashMap<>(); - private static final ExpiringMap udfLoadedClasses = new ExpiringMap<>(); + // Cache of UDF class metadata (including the URLClassLoader used to load the UDF). + // Entries are inserted on first use and only ever removed by an explicit + // cleanUdfClassLoader() call (triggered by FE on DROP FUNCTION). There is intentionally + // no time-based eviction: that previously caused two issues — + // 1) closing a URLClassLoader while another thread was still loading classes from it + // led to NoClassDefFoundError; + // 2) rebuilding a fresh URLClassLoader on every eviction produced multiple coexisting + // ClassLoaders for the same UDF, which broke lazy class resolution and reflective + // lookups inside user UDF code. + // NOTE: a cache miss in BaseExecutor.getClassCache() is NOT only reachable after + // cleanUdfClassLoader() — concurrent first-time loads of the same signature can also + // both observe a miss. cacheClassLoader() must therefore insert atomically via + // putIfAbsent and must never close a cache that was already published to the map, + // because another executor may already be holding it. + private static final Map udfLoadedClasses = new ConcurrentHashMap<>(); private static final String CLASS_SUFFIX = ".class"; private static final String LOAD_PACKAGE = "org.apache.doris"; @@ -116,15 +130,57 @@ public static UdfClassCache getUdfClassLoader(String functionSignature) { return udfLoadedClasses.get(functionSignature); } - public static synchronized void cacheClassLoader(String functionSignature, UdfClassCache classCache, + /** + * Cache the UDF class metadata for the given function signature. + * + *

Insertion is atomic via {@link Map#putIfAbsent}: if another executor thread has + * already published a cache entry for {@code functionSignature}, the {@code classCache} + * argument is treated as a redundant build and closed here (it has not yet been handed + * to any executor, so closing its URLClassLoader is safe). The already-published entry + * is returned to the caller so the current executor can switch to it.

+ * + *

The {@code expirationTime} parameter is kept for backward compatibility with the + * existing call sites and DDL property {@code expiration_time}, but is no longer used: + * cached entries are not evicted by time. Removal happens only via + * {@link #cleanUdfClassLoader(String)} on DROP FUNCTION.

+ * + * @return the {@link UdfClassCache} actually held in the map after this call — + * either {@code classCache} (we won the race) or the pre-existing entry + * (another thread won; {@code classCache} has been closed and must not be used). + */ + public static UdfClassCache cacheClassLoader(String functionSignature, UdfClassCache classCache, long expirationTime) { - LOG.info("Cache UDF for: {}", functionSignature); - udfLoadedClasses.put(functionSignature, classCache, expirationTime * 60 * 1000L); + LOG.info("Cache UDF for: " + functionSignature); + UdfClassCache existing = udfLoadedClasses.putIfAbsent(functionSignature, classCache); + if (existing == null) { + return classCache; + } + // Lost the race against a concurrent first-time load. The cache we just built has + // never been exposed to any executor, so closing its URLClassLoader here cannot + // affect anyone. Do NOT touch `existing` — another executor may already be using it. + try { + classCache.close(); + } catch (Exception e) { + LOG.warn("Failed to close redundant UdfClassCache for " + functionSignature, e); + } + return existing; } - public synchronized void cleanUdfClassLoader(String functionSignature) { - LOG.info("cleanUdfClassLoader for: {}", functionSignature); - udfLoadedClasses.remove(functionSignature); + public void cleanUdfClassLoader(String functionSignature) { + LOG.info("cleanUdfClassLoader for: " + functionSignature); + UdfClassCache removed = udfLoadedClasses.remove(functionSignature); + if (removed != null) { + // Immediately close the URLClassLoader. NOTE: any in-flight query still holding a + // reference to this cache (e.g. via JNIContext.executor) will fail with + // NoClassDefFoundError on lazy class resolution after this point. This is the + // accepted semantic of DROP FUNCTION: the function is gone, queries against it + // are expected to fail. + try { + removed.close(); + } catch (Exception e) { + LOG.warn("Failed to close UdfClassCache for " + functionSignature, e); + } + } } /** diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java deleted file mode 100644 index 3496d5bbb63eb6..00000000000000 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.jni.utils; - -import org.apache.log4j.Logger; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ExpiringMap { - private final ConcurrentHashMap map = new ConcurrentHashMap<>(); // key --> value - private final ConcurrentHashMap ttlMap = new ConcurrentHashMap<>(); // key --> ttl interval - // key --> expirationTime(ttl interval + currentTimeMillis) - private final ConcurrentHashMap expirationMap = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private static final long DEFAULT_INTERVAL_TIME = 10 * 60 * 1000L; // 10 minutes - public static final Logger LOG = Logger.getLogger(ExpiringMap.class); - - public ExpiringMap() { - startExpirationTask(); - } - - public void put(K key, V value, long expirationTimeMs) { - long expirationTime = System.currentTimeMillis() + expirationTimeMs; - map.put(key, value); - expirationMap.put(key, expirationTime); - ttlMap.put(key, expirationTimeMs); - } - - public V get(K key) { - Long expirationTime = expirationMap.get(key); - if (expirationTime == null || System.currentTimeMillis() > expirationTime) { - remove(key); - return null; - } - // reset time again - long ttl = ttlMap.get(key); - long newExpirationTime = System.currentTimeMillis() + ttl; - expirationMap.put(key, newExpirationTime); - return map.get(key); - } - - private void startExpirationTask() { - scheduler.scheduleAtFixedRate(() -> { - long now = System.currentTimeMillis(); - for (K key : expirationMap.keySet()) { - if (expirationMap.get(key) <= now) { - remove(key); - } - } - }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES); - } - - public void remove(K key) { - V value = map.remove(key); - expirationMap.remove(key); - ttlMap.remove(key); - - // Uniformly release resources for any AutoCloseable value, - if (value instanceof AutoCloseable) { - try { - ((AutoCloseable) value).close(); - } catch (Exception e) { - LOG.warn("Failed to close cached resource: " + key, e); - } - } - } - - public int size() { - return map.size(); - } - - public void shutdown() { - scheduler.shutdown(); - try { - if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { - scheduler.shutdownNow(); - } - } catch (InterruptedException e) { - scheduler.shutdownNow(); - } - } -} diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java index 0967e9fde0bd07..6356576baaf573 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java @@ -138,8 +138,12 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira UdfClassCache cache = null; if (isStaticLoad) { cache = ScannerLoader.getUdfClassLoader(signature); - if (cache != null && cache.classLoader != null) { - // Reuse the cached classLoader to ensure dependent classes can be loaded + if (cache != null) { + // Reuse the cached classLoader to ensure dependent classes can be loaded. + // NOTE: cache.classLoader may be null when the UDF was originally loaded via + // the system class loader (jarPath empty / custom_lib UDF); see + // UdfClassCache#classLoader. A null value here is a valid cached state and + // must NOT trigger a rebuild — only an actual cache miss does. classLoader = cache.classLoader; } } @@ -162,7 +166,14 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira cache.classLoader = classLoader; checkAndCacheUdfClass(cache, funcRetType, parameterTypes); if (isStaticLoad) { - ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + UdfClassCache effective = ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + if (effective != cache) { + // Another thread won the publish race. Our locally-built cache (and its + // URLClassLoader) was already closed inside cacheClassLoader(); switch to + // the published one so we share its live classLoader. + cache = effective; + classLoader = cache.classLoader; + } } } return cache;