org.apache.activemq
artemis-ra
diff --git a/artemis-lockmanager/artemis-kube-lock/pom.xml b/artemis-lockmanager/artemis-kube-lock/pom.xml
new file mode 100644
index 00000000000..5cdc69ee093
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/pom.xml
@@ -0,0 +1,46 @@
+
+
+ 4.0.0
+
+
+ org.apache.artemis
+ artemis-lockmanager
+ 2.55.0-SNAPSHOT
+
+
+ artemis-kube-lock
+ bundle
+ Kubernetes Lock Manager
+
+
+ ${project.basedir}/../..
+
+
+
+
+ org.apache.artemis
+ artemis-lockmanager-api
+
+
+ org.apache.artemis
+ artemis-commons
+
+
+
+
+
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java
new file mode 100644
index 00000000000..7238c56e505
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
+import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubeLock implements DistributedLock {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ final String hostname;
+ final String namespace;
+ final String id;
+ private final long leasePeriodSeconds;
+
+ private final KubeRestClient kubeClient;
+
+
+ public KubeLock(String hostname, String namespace, String id, long leasePeriodSeconds, String apiServer, String token) {
+ this.hostname = hostname;
+ this.namespace = namespace;
+ this.id = id;
+ this.leasePeriodSeconds = leasePeriodSeconds;
+ this.kubeClient = new KubeRestClient(apiServer, token);
+ }
+
+ @Override
+ public String getLockId() {
+ return id;
+ }
+
+ @Override
+ public boolean isHeldByCaller() throws UnavailableStateException {
+ return renewLock();
+ }
+
+ private boolean renewLock() throws UnavailableStateException {
+ try {
+ // Try to read the existing lease
+ String path = "/apis/coordination.k8s.io/v1/namespaces/" + namespace + "/leases/" + id;
+ JsonObject existingLease = kubeClient.get(path);
+
+ if (existingLease == null) {
+ logger.debug("Create lock");
+ return createLock();
+ }
+
+ JsonObject spec = existingLease.getJsonObject("spec");
+ if (spec == null) {
+ logger.warn("Lease spec is null");
+ return false;
+ }
+
+ String holderIdentity = spec.getString("holderIdentity", null);
+ String renewTimeStr = spec.getString("renewTime", null);
+ int leaseDuration = spec.getInt("leaseDurationSeconds", (int) leasePeriodSeconds);
+
+ logger.debug("renewLock, Read lease: renewTime={}, holderIdentity={}, leaseDuration={}",
+ renewTimeStr, holderIdentity, leaseDuration);
+
+ // Check if we already hold this lease
+ if (hostname.equals(holderIdentity)) {
+ // Renew the lease
+ String newRenewTime = OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ String updatedJson = buildLeaseJson(existingLease, holderIdentity, spec.getString("acquireTime", newRenewTime), newRenewTime, leaseDuration);
+ kubeClient.put(path, updatedJson);
+ return true;
+ }
+
+ // Check if the lease has expired by using the leaseDurationSeconds from the lease spec
+ if (renewTimeStr != null) {
+ OffsetDateTime renewTime = OffsetDateTime.parse(renewTimeStr);
+ OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("renew period:: {}, now = {}, between={}, between seconds={}", renewTime, now, Duration.between(renewTime, now), Duration.between(renewTime, now).toSeconds());
+ }
+ long ageSeconds = Duration.between(renewTime, now).toSeconds();
+
+ if (ageSeconds > leaseDuration) {
+ // Lease has expired, try to acquire it
+ OffsetDateTime newTime = OffsetDateTime.now(ZoneOffset.UTC);
+ String newRenewTime = newTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ String newAcquireTime = newTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+
+ logger.debug("acquiring expired lease. Setting renewTime = {}, holderIdentity = {}, leaseDuration = {}", newRenewTime, hostname, leasePeriodSeconds);
+
+ String updatedJson = buildLeaseJson(existingLease, hostname, newAcquireTime, newRenewTime, (int) leasePeriodSeconds);
+ kubeClient.put(path, updatedJson);
+ return true;
+ }
+ }
+
+ // Lease is held by someone else and not expired
+ return false;
+
+ } catch (IOException | InterruptedException e) {
+ logger.warn(e.getMessage(), e);
+ throw new UnavailableStateException("Failed to renew lock", e);
+ }
+ }
+
+ @Override
+ public boolean tryLock() throws UnavailableStateException {
+ return renewLock();
+ }
+
+ private boolean createLock() throws UnavailableStateException {
+ // Lease doesn't exist, create a new one
+ try {
+ String nowTime = OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+
+ String leaseJson = "{\n" +
+ " \"apiVersion\": \"coordination.k8s.io/v1\",\n" +
+ " \"kind\": \"Lease\",\n" +
+ " \"metadata\": {\n" +
+ " \"name\": \"" + id + "\",\n" +
+ " \"namespace\": \"" + namespace + "\"\n" +
+ " },\n" +
+ " \"spec\": {\n" +
+ " \"holderIdentity\": \"" + hostname + "\",\n" +
+ " \"acquireTime\": \"" + nowTime + "\",\n" +
+ " \"renewTime\": \"" + nowTime + "\",\n" +
+ " \"leaseDurationSeconds\": " + leasePeriodSeconds + "\n" +
+ " }\n" +
+ "}";
+
+ String path = "/apis/coordination.k8s.io/v1/namespaces/" + namespace + "/leases";
+ kubeClient.post(path, leaseJson);
+ return true;
+ } catch (IOException | InterruptedException createException) {
+ // Race condition - someone else created it first
+ logger.warn(createException.getMessage(), createException);
+ return false;
+ }
+ }
+
+ private String buildLeaseJson(JsonObject existingLease, String holderIdentity, String acquireTime, String renewTime, int leaseDurationSeconds) {
+ JsonObject metadata = existingLease.getJsonObject("metadata");
+ String resourceVersion = metadata != null ? metadata.getString("resourceVersion", "") : "";
+
+ return "{\n" +
+ " \"apiVersion\": \"coordination.k8s.io/v1\",\n" +
+ " \"kind\": \"Lease\",\n" +
+ " \"metadata\": {\n" +
+ " \"name\": \"" + id + "\",\n" +
+ " \"namespace\": \"" + namespace + "\",\n" +
+ " \"resourceVersion\": \"" + resourceVersion + "\"\n" +
+ " },\n" +
+ " \"spec\": {\n" +
+ " \"holderIdentity\": \"" + holderIdentity + "\",\n" +
+ " \"acquireTime\": \"" + acquireTime + "\",\n" +
+ " \"renewTime\": \"" + renewTime + "\",\n" +
+ " \"leaseDurationSeconds\": " + leaseDurationSeconds + "\n" +
+ " }\n" +
+ "}";
+ }
+
+ @Override
+ public void unlock() throws UnavailableStateException {
+ try {
+ // Try to read the existing lease
+ String path = "/apis/coordination.k8s.io/v1/namespaces/" + namespace + "/leases/" + id;
+ JsonObject existingLease = kubeClient.get(path);
+
+ if (existingLease == null) {
+ // Lease doesn't exist - already unlocked or expired
+ logger.debug("Lock {} not found, already released", id);
+ return;
+ }
+
+ JsonObject spec = existingLease.getJsonObject("spec");
+ if (spec == null) {
+ logger.warn("Lease spec is null during unlock");
+ return;
+ }
+
+ String holderIdentity = spec.getString("holderIdentity", null);
+
+ // Only unlock if we hold the lease
+ if (hostname.equals(holderIdentity)) {
+ // Delete the lease to release the lock
+ kubeClient.delete(path);
+ logger.debug("Released lock: {}", id);
+ } else {
+ logger.warn("Attempted to unlock {} but it's held by {}", id, holderIdentity);
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new UnavailableStateException("Failed to unlock: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void addListener(UnavailableLockListener listener) {
+
+ }
+
+ @Override
+ public void removeListener(UnavailableLockListener listener) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java
new file mode 100644
index 00000000000..fd8d1e76321
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
+import org.apache.activemq.artemis.lockmanager.MutableLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Kubernetes-based distributed lock manager implementation.
+ *
+ * Configuration parameters:
+ *
+ * - hostname: The hostname identifier for this instance.
+ * Default: value of the HOSTNAME environment variable
+ * - namespace: The Kubernetes namespace where locks are managed.
+ * Default: reads from /var/run/secrets/kubernetes.io/serviceaccount/namespace,
+ * falls back to "default" if unavailable
+ * - lease-timeout: The lease timeout in seconds.
+ * Default: 30 seconds
+ *
+ */
+public class KubeLockManager extends AbstractDistributedLockManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String HOSTNAME = "hostname";
+ private static final String NAMESPACE = "namespace";
+ private static final String LEASE_TIMEOUT = "lease-timeout";
+ private static final String API_SERVER = "api-server";
+ private static final String TOKEN = "token";
+ private static final Set VALID_PARAMS = Stream.of(
+ HOSTNAME,
+ NAMESPACE,
+ LEASE_TIMEOUT,
+ API_SERVER,
+ TOKEN).collect(Collectors.toSet());
+ private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(","));
+
+ private String hostname;
+ private String namespace;
+ private String apiServer;
+ private String token;
+ long leaseTimeout;
+
+
+
+ public KubeLockManager(Map config) {
+ this(config.get(HOSTNAME),
+ config.get(NAMESPACE),
+ config.get(LEASE_TIMEOUT) != null ? Long.parseLong(config.get(LEASE_TIMEOUT)) : 30,
+ config.get(API_SERVER),
+ config.get(TOKEN));
+ validateParameters(config);
+ }
+
+ public KubeLockManager(String hostname, String namespace, long leaseTimeout, String apiServer, String token) {
+ this.hostname = hostname != null ? hostname : System.getenv("HOSTNAME");
+ this.leaseTimeout = leaseTimeout > 0 ? leaseTimeout : 30;
+
+ // Set namespace - use provided value, fall back to service account file, then "default"
+ if (namespace != null) {
+ this.namespace = namespace;
+ } else {
+ try {
+ this.namespace = Files.readString(Path.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace")).trim();
+ logger.debug("Read namespace from Kubernetes service account: {}", this.namespace);
+ } catch (IOException e) {
+ logger.debug("Could not read namespace from service account, using default", e);
+ this.namespace = "default";
+ }
+ }
+
+ // Set API server - use provided value, fall back to environment variable, then default
+ if (apiServer != null) {
+ this.apiServer = apiServer;
+ } else {
+ String envApiServer = System.getenv("KUBE_API_SERVER");
+ this.apiServer = envApiServer != null ? envApiServer : "https://kubernetes.default.svc";
+ }
+
+ // Set token - use provided value, fall back to environment variable, then service account file
+ if (token != null) {
+ this.token = token;
+ } else {
+ String envToken = System.getenv("KUBE_TOKEN");
+ if (envToken != null) {
+ this.token = envToken;
+ logger.debug("Using token from KUBE_TOKEN environment variable");
+ } else {
+ try {
+ this.token = Files.readString(Path.of("/var/run/secrets/kubernetes.io/serviceaccount/token")).trim();
+ logger.debug("Read token from Kubernetes service account");
+ } catch (IOException e) {
+ throw new IllegalStateException("No token provided and could not read from service account at " +
+ "/var/run/secrets/kubernetes.io/serviceaccount/token. " +
+ "Provide token via constructor, configuration, or KUBE_TOKEN environment variable.", e);
+ }
+ }
+ }
+
+ logger.debug("KubeLockManager configured: hostname={}, namespace={}, apiServer={}, leaseTimeout={}",
+ this.hostname, this.namespace, this.apiServer, this.leaseTimeout);
+ }
+
+
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
+ @Override
+ public void addUnavailableManagerListener(UnavailableManagerListener listener) {
+
+ }
+
+ @Override
+ public void removeUnavailableManagerListener(UnavailableManagerListener listener) {
+
+ }
+
+ @Override
+ public boolean start(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
+ return true;
+ }
+
+ @Override
+ public void start() throws InterruptedException, ExecutionException {
+ }
+
+ @Override
+ public boolean isStarted() {
+ return true;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public DistributedLock getDistributedLock(String lockId) throws InterruptedException, ExecutionException, TimeoutException {
+ return new KubeLock(hostname, namespace, lockId, leaseTimeout, apiServer, token);
+ }
+
+ @Override
+ public MutableLong getMutableLong(String mutableLongId) throws InterruptedException, ExecutionException, TimeoutException {
+ return new KubeMutableLong(namespace, mutableLongId, apiServer, token);
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java
new file mode 100644
index 00000000000..f4dc45dded4
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.lockmanager.MutableLong;
+import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubeMutableLong implements MutableLong {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final String VALUE_KEY = "value";
+
+ private final String namespace;
+ private final String id;
+ private final String configMapName;
+ private final KubeRestClient kubeClient;
+
+ public KubeMutableLong(String namespace, String id, String apiServer, String token) {
+ this.namespace = namespace;
+ this.id = id;
+ this.configMapName = sanitizeConfigMapName(id);
+ this.kubeClient = new KubeRestClient(apiServer, token);
+ }
+
+ private static String sanitizeConfigMapName(String name) {
+ return name.toLowerCase().replaceAll("[^a-z0-9.-]", "-");
+ }
+
+ @Override
+ public String getMutableLongId() {
+ return id;
+ }
+
+ @Override
+ public long get() throws UnavailableStateException {
+ try {
+ String path = "/api/v1/namespaces/" + namespace + "/configmaps/" + configMapName;
+ JsonObject configMap = kubeClient.get(path);
+
+ if (configMap == null) {
+ return 0L;
+ }
+
+ JsonObject data = configMap.getJsonObject("data");
+ if (data == null || !data.containsKey(VALUE_KEY)) {
+ return 0L;
+ }
+
+ String valueStr = data.getString(VALUE_KEY, "0");
+ return Long.parseLong(valueStr);
+ } catch (IOException | InterruptedException e) {
+ throw new UnavailableStateException("Failed to get mutable long value: " + e.getMessage(), e);
+ } catch (NumberFormatException e) {
+ throw new UnavailableStateException("Invalid long value stored in ConfigMap", e);
+ }
+ }
+
+ @Override
+ public void set(long value) throws UnavailableStateException {
+ try {
+ String path = "/api/v1/namespaces/" + namespace + "/configmaps/" + configMapName;
+ JsonObject existingConfigMap = kubeClient.get(path);
+
+ if (existingConfigMap != null) {
+ // Update existing ConfigMap
+ String updatedJson = buildConfigMapJson(existingConfigMap, value);
+ kubeClient.put(path, updatedJson);
+ } else {
+ // Create new ConfigMap
+ String newConfigMapJson = "{\n" +
+ " \"apiVersion\": \"v1\",\n" +
+ " \"kind\": \"ConfigMap\",\n" +
+ " \"metadata\": {\n" +
+ " \"name\": \"" + configMapName + "\",\n" +
+ " \"namespace\": \"" + namespace + "\"\n" +
+ " },\n" +
+ " \"data\": {\n" +
+ " \"" + VALUE_KEY + "\": \"" + value + "\"\n" +
+ " }\n" +
+ "}";
+
+ String createPath = "/api/v1/namespaces/" + namespace + "/configmaps";
+ kubeClient.post(createPath, newConfigMapJson);
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new UnavailableStateException("Failed to set mutable long value: " + e.getMessage(), e);
+ }
+ }
+
+ private String buildConfigMapJson(JsonObject existingConfigMap, long value) {
+ JsonObject metadata = existingConfigMap.getJsonObject("metadata");
+ String resourceVersion = metadata != null ? metadata.getString("resourceVersion", "") : "";
+
+ return "{\n" +
+ " \"apiVersion\": \"v1\",\n" +
+ " \"kind\": \"ConfigMap\",\n" +
+ " \"metadata\": {\n" +
+ " \"name\": \"" + configMapName + "\",\n" +
+ " \"namespace\": \"" + namespace + "\",\n" +
+ " \"resourceVersion\": \"" + resourceVersion + "\"\n" +
+ " },\n" +
+ " \"data\": {\n" +
+ " \"" + VALUE_KEY + "\": \"" + value + "\"\n" +
+ " }\n" +
+ "}";
+ }
+
+ @Override
+ public void close() {
+ // No cleanup needed for now
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeRestClient.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeRestClient.java
new file mode 100644
index 00000000000..82b3d19f32b
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeRestClient.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.utils.JsonLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubeRestClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ static final String DEFAULT_API_SERVER = "https://kubernetes.default.svc";
+ static final String SERVICE_ACCOUNT_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token";
+ static final String SERVICE_ACCOUNT_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
+
+ private final String apiServer;
+ private final String token;
+ private final HttpClient httpClient;
+
+ public KubeRestClient(String apiServer, String token) {
+ this.apiServer = apiServer;
+ this.token = token;
+ this.httpClient = HttpClient.newBuilder()
+ .connectTimeout(Duration.ofSeconds(10))
+ .build();
+ }
+
+ public JsonObject get(String path) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Accept", "application/json")
+ .GET()
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() == 404) {
+ return null;
+ }
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+ public JsonObject post(String path, String jsonBody) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString(jsonBody))
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+ public JsonObject put(String path, String jsonBody) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .PUT(HttpRequest.BodyPublishers.ofString(jsonBody))
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+ public void delete(String path) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Accept", "application/json")
+ .DELETE()
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() != 404 && (response.statusCode() < 200 || response.statusCode() >= 300)) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+ }
+
+ public int getStatusCode(String path) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Accept", "application/json")
+ .GET()
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+ return response.statusCode();
+ }
+}
diff --git a/artemis-lockmanager/artemis-lockmanager-api/pom.xml b/artemis-lockmanager/artemis-lockmanager-api/pom.xml
index f5bd3223ada..4191819fa57 100644
--- a/artemis-lockmanager/artemis-lockmanager-api/pom.xml
+++ b/artemis-lockmanager/artemis-lockmanager-api/pom.xml
@@ -31,11 +31,4 @@
${project.basedir}/../..
-
-
- org.apache.artemis
- artemis-commons
-
-
-
diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java
new file mode 100644
index 00000000000..2204b10e500
--- /dev/null
+++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.lockmanager;
+
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.stream.Collectors.joining;
+
+public abstract class AbstractDistributedLockManager implements DistributedLockManager {
+
+ public AbstractDistributedLockManager() {
+ }
+
+ public AbstractDistributedLockManager(Map properties) {
+ validateParameters(properties);
+ }
+
+ protected String commaOnParameters() {
+ return getValidParams().stream().collect(joining(","));
+ }
+
+
+ protected void validateParameters(Map config) {
+ config.forEach((parameterName, ignore) -> validateParameter(parameterName));
+ }
+
+ protected abstract Set getValidParams();
+
+ protected void validateParameter(String parameterName) {
+ if (!getValidParams().contains(parameterName)) {
+ throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + commaOnParameters());
+ }
+ }
+
+
+
+}
diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
index d42f8e985fa..71b7132099d 100644
--- a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
@@ -16,21 +16,20 @@
*/
package org.apache.activemq.artemis.lockmanager;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.activemq.artemis.utils.ClassloadingUtil;
-
public interface DistributedLockManager extends AutoCloseable {
static DistributedLockManager newInstanceOf(String className, Map properties) throws Exception {
- return (DistributedLockManager) ClassloadingUtil.getInstanceForParamsWithTypeCheck(className,
- DistributedLockManager.class,
- DistributedLockManager.class.getClassLoader(),
- new Class[]{Map.class},
- properties);
+ return (DistributedLockManager) getInstanceForParamsWithTypeCheck(className,
+ DistributedLockManager.class,
+ DistributedLockManager.class.getClassLoader(),
+ new Class[]{Map.class},
+ properties);
}
@FunctionalInterface
@@ -59,4 +58,24 @@ interface UnavailableManagerListener {
default void close() {
stop();
}
+
+ // This is copied from ClassLoadingUtils..
+ // I need to cut the dependency here to make it easier for external modules to implement the API here.
+ private static Object getInstanceForParamsWithTypeCheck(String className,
+ Class> expectedType,
+ ClassLoader loader, Class>[] parameterTypes, Object... params) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ final Class> clazz = loadWithCheck(className, expectedType, loader);
+ return clazz.getDeclaredConstructor(parameterTypes).newInstance(params);
+ }
+
+ private static Class> loadWithCheck(String className,
+ Class> expectedType,
+ ClassLoader loader) throws ClassNotFoundException {
+ Class> clazz = loader.loadClass(className);
+ if (!expectedType.isAssignableFrom(clazz)) {
+ throw new IllegalStateException("clazz [" + className + "] is not assignable from expected type: " + expectedType);
+ }
+ return clazz;
+ }
+
}
\ No newline at end of file
diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
index 3329b88bf79..a47e9e7cd8b 100644
--- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
@@ -23,11 +23,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
import org.apache.activemq.artemis.lockmanager.DistributedLock;
-import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
@@ -42,14 +45,25 @@
* The directory must be created in advance before using this lock manager.
*
*/
-public class FileBasedLockManager implements DistributedLockManager {
+public class FileBasedLockManager extends AbstractDistributedLockManager {
private final File locksFolder;
private final Map locks;
private boolean started;
+ private static final String LOCKS_FOLDER = "locks-folder";
+ private static final Set VALID_PARAMS = Stream.of(
+ LOCKS_FOLDER).collect(Collectors.toSet());
+
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
public FileBasedLockManager(Map args) {
this(new File(args.get("locks-folder")));
+
+ validateParameters(args);
}
public FileBasedLockManager(File locksFolder) {
diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
index e6aa6689ca2..cb8ba223390 100644
--- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
@@ -28,8 +28,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
import org.apache.activemq.artemis.lockmanager.DistributedLock;
-import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -42,7 +42,6 @@
import org.apache.curator.utils.DebugUtils;
import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.joining;
/**
* ZooKeeper-based distributed lock manager using Apache Curator.
@@ -58,7 +57,7 @@
* retries-ms (optional, default: 1000): Delay in milliseconds between retry attempts
*
*/
-public class CuratorDistributedLockManager implements DistributedLockManager, ConnectionStateListener {
+public class CuratorDistributedLockManager extends AbstractDistributedLockManager implements ConnectionStateListener {
enum PrimitiveType {
lock, mutableLong;
@@ -111,6 +110,11 @@ public int hashCode() {
}
}
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
private static final String CONNECT_STRING_PARAM = "connect-string";
private static final String NAMESPACE_PARAM = "namespace";
private static final String SESSION_MS_PARAM = "session-ms";
@@ -126,7 +130,6 @@ public int hashCode() {
CONNECTION_MS_PARAM,
RETRIES_PARAM,
RETRIES_MS_PARAM).collect(Collectors.toSet());
- private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(","));
// It's 9 times the default ZK tick time ie 2000 ms
private static final String DEFAULT_SESSION_TIMEOUT_MS = Integer.toString(18_000);
private static final String DEFAULT_CONNECTION_TIMEOUT_MS = Integer.toString(8_000);
@@ -135,17 +138,6 @@ public int hashCode() {
// why 1/3 of the session? https://cwiki.apache.org/confluence/display/CURATOR/TN14
private static final String DEFAULT_SESSION_PERCENT = Integer.toString(33);
- private static Map validateParameters(Map config) {
- config.forEach((parameterName, ignore) -> validateParameter(parameterName));
- return config;
- }
-
- private static void validateParameter(String parameterName) {
- if (!VALID_PARAMS.contains(parameterName)) {
- throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + VALID_PARAMS_ON_ERROR);
- }
- }
-
private CuratorFramework client;
private final Map primitives;
private List listeners;
@@ -161,10 +153,6 @@ private static void validateParameter(String parameterName) {
}
public CuratorDistributedLockManager(Map config) {
- this(validateParameters(config), true);
- }
-
- private CuratorDistributedLockManager(Map config, boolean ignore) {
this(config.get(CONNECT_STRING_PARAM),
config.get(NAMESPACE_PARAM),
Integer.parseInt(config.getOrDefault(SESSION_MS_PARAM, DEFAULT_SESSION_TIMEOUT_MS)),
@@ -172,6 +160,7 @@ private CuratorDistributedLockManager(Map config, boolean ignore
Integer.parseInt(config.getOrDefault(CONNECTION_MS_PARAM, DEFAULT_CONNECTION_TIMEOUT_MS)),
Integer.parseInt(config.getOrDefault(RETRIES_PARAM, DEFAULT_RETRIES)),
Integer.parseInt(config.getOrDefault(RETRIES_MS_PARAM, DEFAULT_RETRIES_MS)));
+ validateParameters(config);
}
private CuratorDistributedLockManager(String connectString,
diff --git a/artemis-lockmanager/pom.xml b/artemis-lockmanager/pom.xml
index cbb24a3aa8f..9c5c3f05de3 100644
--- a/artemis-lockmanager/pom.xml
+++ b/artemis-lockmanager/pom.xml
@@ -32,6 +32,7 @@
artemis-lockmanager-api
artemis-lockmanager-ri
+ artemis-kube-lock
diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml
index f8edce0c30e..05cf276baa5 100644
--- a/artemis-pom/pom.xml
+++ b/artemis-pom/pom.xml
@@ -951,6 +951,20 @@
+
+