Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.common.classloader;

import org.apache.doris.common.jni.utils.ExpiringMap;
import org.apache.doris.common.jni.utils.UdfClassCache;

import com.google.common.collect.Streams;
Expand All @@ -37,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -88,7 +88,21 @@ public class ScannerLoader {

public static final Logger LOG = LogManager.getLogger(ScannerLoader.class);
private static final Map<String, Class<?>> loadedClasses = new HashMap<>();
private static final ExpiringMap<String, UdfClassCache> udfLoadedClasses = new ExpiringMap<>();
// Cache of UDF class metadata (including the URLClassLoader used to load the UDF).
// Entries are inserted on first use and only ever removed by an explicit
// cleanUdfClassLoader() call (triggered by FE on DROP FUNCTION). There is intentionally
// no time-based eviction: that previously caused two issues —
// 1) closing a URLClassLoader while another thread was still loading classes from it
// led to NoClassDefFoundError;
// 2) rebuilding a fresh URLClassLoader on every eviction produced multiple coexisting
// ClassLoaders for the same UDF, which broke lazy class resolution and reflective
// lookups inside user UDF code.
// NOTE: a cache miss in BaseExecutor.getClassCache() is NOT only reachable after
// cleanUdfClassLoader() — concurrent first-time loads of the same signature can also
// both observe a miss. cacheClassLoader() must therefore insert atomically via
// putIfAbsent and must never close a cache that was already published to the map,
// because another executor may already be holding it.
private static final Map<String, UdfClassCache> udfLoadedClasses = new ConcurrentHashMap<>();
private static final String CLASS_SUFFIX = ".class";
private static final String LOAD_PACKAGE = "org.apache.doris";

Expand Down Expand Up @@ -116,15 +130,57 @@ public static UdfClassCache getUdfClassLoader(String functionSignature) {
return udfLoadedClasses.get(functionSignature);
}

public static synchronized void cacheClassLoader(String functionSignature, UdfClassCache classCache,
/**
* Cache the UDF class metadata for the given function signature.
*
* <p>Insertion is atomic via {@link Map#putIfAbsent}: if another executor thread has
* already published a cache entry for {@code functionSignature}, the {@code classCache}
* argument is treated as a redundant build and closed here (it has not yet been handed
* to any executor, so closing its URLClassLoader is safe). The already-published entry
* is returned to the caller so the current executor can switch to it.</p>
*
* <p>The {@code expirationTime} parameter is kept for backward compatibility with the
* existing call sites and DDL property {@code expiration_time}, but is no longer used:
* cached entries are not evicted by time. Removal happens only via
* {@link #cleanUdfClassLoader(String)} on DROP FUNCTION.</p>
*
* @return the {@link UdfClassCache} actually held in the map after this call —
* either {@code classCache} (we won the race) or the pre-existing entry
* (another thread won; {@code classCache} has been closed and must not be used).
*/
public static UdfClassCache cacheClassLoader(String functionSignature, UdfClassCache classCache,
long expirationTime) {
LOG.info("Cache UDF for: {}", functionSignature);
Comment thread
zhangstar333 marked this conversation as resolved.
udfLoadedClasses.put(functionSignature, classCache, expirationTime * 60 * 1000L);
LOG.info("Cache UDF for: " + functionSignature);
UdfClassCache existing = udfLoadedClasses.putIfAbsent(functionSignature, classCache);
if (existing == null) {
return classCache;
}
// Lost the race against a concurrent first-time load. The cache we just built has
// never been exposed to any executor, so closing its URLClassLoader here cannot
// affect anyone. Do NOT touch `existing` — another executor may already be using it.
try {
classCache.close();
} catch (Exception e) {
LOG.warn("Failed to close redundant UdfClassCache for " + functionSignature, e);
}
return existing;
}

public synchronized void cleanUdfClassLoader(String functionSignature) {
LOG.info("cleanUdfClassLoader for: {}", functionSignature);
udfLoadedClasses.remove(functionSignature);
public void cleanUdfClassLoader(String functionSignature) {
LOG.info("cleanUdfClassLoader for: " + functionSignature);
UdfClassCache removed = udfLoadedClasses.remove(functionSignature);
if (removed != null) {
// Immediately close the URLClassLoader. NOTE: any in-flight query still holding a
// reference to this cache (e.g. via JNIContext.executor) will fail with
// NoClassDefFoundError on lazy class resolution after this point. This is the
// accepted semantic of DROP FUNCTION: the function is gone, queries against it
// are expected to fail.
try {
removed.close();
} catch (Exception e) {
LOG.warn("Failed to close UdfClassCache for " + functionSignature, e);
}
}
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira
UdfClassCache cache = null;
if (isStaticLoad) {
cache = ScannerLoader.getUdfClassLoader(signature);
if (cache != null && cache.classLoader != null) {
// Reuse the cached classLoader to ensure dependent classes can be loaded
if (cache != null) {
// Reuse the cached classLoader to ensure dependent classes can be loaded.
// NOTE: cache.classLoader may be null when the UDF was originally loaded via
// the system class loader (jarPath empty / custom_lib UDF); see
// UdfClassCache#classLoader. A null value here is a valid cached state and
// must NOT trigger a rebuild — only an actual cache miss does.
classLoader = cache.classLoader;
}
}
Expand All @@ -162,7 +166,14 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira
cache.classLoader = classLoader;
checkAndCacheUdfClass(cache, funcRetType, parameterTypes);
if (isStaticLoad) {
ScannerLoader.cacheClassLoader(signature, cache, expirationTime);
UdfClassCache effective = ScannerLoader.cacheClassLoader(signature, cache, expirationTime);
if (effective != cache) {
// Another thread won the publish race. Our locally-built cache (and its
// URLClassLoader) was already closed inside cacheClassLoader(); switch to
// the published one so we share its live classLoader.
cache = effective;
classLoader = cache.classLoader;
}
}
}
return cache;
Expand Down
Loading