Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions artemis-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,11 @@
<artifactId>artemis-lockmanager-ri</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-kube-lock</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-ra</artifactId>
Expand Down
46 changes: 46 additions & 0 deletions artemis-lockmanager/artemis-kube-lock/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.artemis</groupId>
<artifactId>artemis-lockmanager</artifactId>
<version>2.55.0-SNAPSHOT</version>
</parent>

<artifactId>artemis-kube-lock</artifactId>
<packaging>bundle</packaging>
<name>Kubernetes Lock Manager</name>

<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.artemis</groupId>
<artifactId>artemis-lockmanager-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.artemis</groupId>
<artifactId>artemis-commons</artifactId>
</dependency>
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.artemis.lock.kube;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.lockmanager.DistributedLock;
import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubeLock implements DistributedLock {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

final String hostname;
final String namespace;
final String id;
private final long leasePeriodSeconds;

private final KubeRestClient kubeClient;


public KubeLock(String hostname, String namespace, String id, long leasePeriodSeconds, String apiServer, String token) {
this.hostname = hostname;
this.namespace = namespace;
this.id = id;
this.leasePeriodSeconds = leasePeriodSeconds;
this.kubeClient = new KubeRestClient(apiServer, token);
}

@Override
public String getLockId() {
return id;
}

@Override
public boolean isHeldByCaller() throws UnavailableStateException {
return renewLock();
}

private boolean renewLock() throws UnavailableStateException {
try {
// Try to read the existing lease
String path = "/apis/coordination.k8s.io/v1/namespaces/" + namespace + "/leases/" + id;
JsonObject existingLease = kubeClient.get(path);

if (existingLease == null) {
logger.debug("Create lock");
return createLock();
}

JsonObject spec = existingLease.getJsonObject("spec");
if (spec == null) {
logger.warn("Lease spec is null");
return false;
}

String holderIdentity = spec.getString("holderIdentity", null);
String renewTimeStr = spec.getString("renewTime", null);
int leaseDuration = spec.getInt("leaseDurationSeconds", (int) leasePeriodSeconds);

logger.debug("renewLock, Read lease: renewTime={}, holderIdentity={}, leaseDuration={}",
renewTimeStr, holderIdentity, leaseDuration);

// Check if we already hold this lease
if (hostname.equals(holderIdentity)) {
// Renew the lease
String newRenewTime = OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
String updatedJson = buildLeaseJson(existingLease, holderIdentity, spec.getString("acquireTime", newRenewTime), newRenewTime, leaseDuration);
kubeClient.put(path, updatedJson);
return true;
}

// Check if the lease has expired by using the leaseDurationSeconds from the lease spec
if (renewTimeStr != null) {
OffsetDateTime renewTime = OffsetDateTime.parse(renewTimeStr);
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);

if (logger.isDebugEnabled()) {
logger.debug("renew period:: {}, now = {}, between={}, between seconds={}", renewTime, now, Duration.between(renewTime, now), Duration.between(renewTime, now).toSeconds());
}
long ageSeconds = Duration.between(renewTime, now).toSeconds();

if (ageSeconds > leaseDuration) {
// Lease has expired, try to acquire it
OffsetDateTime newTime = OffsetDateTime.now(ZoneOffset.UTC);
String newRenewTime = newTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
String newAcquireTime = newTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);

logger.debug("acquiring expired lease. Setting renewTime = {}, holderIdentity = {}, leaseDuration = {}", newRenewTime, hostname, leasePeriodSeconds);

String updatedJson = buildLeaseJson(existingLease, hostname, newAcquireTime, newRenewTime, (int) leasePeriodSeconds);
kubeClient.put(path, updatedJson);
return true;
}
}

// Lease is held by someone else and not expired
return false;

} catch (IOException | InterruptedException e) {
logger.warn(e.getMessage(), e);
throw new UnavailableStateException("Failed to renew lock", e);
}
}

@Override
public boolean tryLock() throws UnavailableStateException {
return renewLock();
}

private boolean createLock() throws UnavailableStateException {
// Lease doesn't exist, create a new one
try {
String nowTime = OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);

String leaseJson = "{\n" +
" \"apiVersion\": \"coordination.k8s.io/v1\",\n" +
" \"kind\": \"Lease\",\n" +
" \"metadata\": {\n" +
" \"name\": \"" + id + "\",\n" +
" \"namespace\": \"" + namespace + "\"\n" +
" },\n" +
" \"spec\": {\n" +
" \"holderIdentity\": \"" + hostname + "\",\n" +
" \"acquireTime\": \"" + nowTime + "\",\n" +
" \"renewTime\": \"" + nowTime + "\",\n" +
" \"leaseDurationSeconds\": " + leasePeriodSeconds + "\n" +
" }\n" +
"}";

String path = "/apis/coordination.k8s.io/v1/namespaces/" + namespace + "/leases";
kubeClient.post(path, leaseJson);
return true;
} catch (IOException | InterruptedException createException) {
// Race condition - someone else created it first
logger.warn(createException.getMessage(), createException);
return false;
}
}

private String buildLeaseJson(JsonObject existingLease, String holderIdentity, String acquireTime, String renewTime, int leaseDurationSeconds) {
JsonObject metadata = existingLease.getJsonObject("metadata");
String resourceVersion = metadata != null ? metadata.getString("resourceVersion", "") : "";

return "{\n" +
" \"apiVersion\": \"coordination.k8s.io/v1\",\n" +
" \"kind\": \"Lease\",\n" +
" \"metadata\": {\n" +
" \"name\": \"" + id + "\",\n" +
" \"namespace\": \"" + namespace + "\",\n" +
" \"resourceVersion\": \"" + resourceVersion + "\"\n" +
" },\n" +
" \"spec\": {\n" +
" \"holderIdentity\": \"" + holderIdentity + "\",\n" +
" \"acquireTime\": \"" + acquireTime + "\",\n" +
" \"renewTime\": \"" + renewTime + "\",\n" +
" \"leaseDurationSeconds\": " + leaseDurationSeconds + "\n" +
" }\n" +
"}";
}

@Override
public void unlock() throws UnavailableStateException {
try {
// Try to read the existing lease
String path = "/apis/coordination.k8s.io/v1/namespaces/" + namespace + "/leases/" + id;
JsonObject existingLease = kubeClient.get(path);

if (existingLease == null) {
// Lease doesn't exist - already unlocked or expired
logger.debug("Lock {} not found, already released", id);
return;
}

JsonObject spec = existingLease.getJsonObject("spec");
if (spec == null) {
logger.warn("Lease spec is null during unlock");
return;
}

String holderIdentity = spec.getString("holderIdentity", null);

// Only unlock if we hold the lease
if (hostname.equals(holderIdentity)) {
// Delete the lease to release the lock
kubeClient.delete(path);
logger.debug("Released lock: {}", id);
} else {
logger.warn("Attempted to unlock {} but it's held by {}", id, holderIdentity);
}
} catch (IOException | InterruptedException e) {
throw new UnavailableStateException("Failed to unlock: " + e.getMessage(), e);
}
}

@Override
public void addListener(UnavailableLockListener listener) {

}

@Override
public void removeListener(UnavailableLockListener listener) {

}

@Override
public void close() {

}
}
Loading
Loading