Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
[versions]
# Core technologies
kotlin = "2.0.20"
java = "11"
kotlinx-serialization = "1.7.3"
kotlinx-io = "0.3.0"
ksp = "1.9.10-1.0.13"
kotlin = "2.1.10"
java = "17"
kotlinx-serialization = "1.8.0"
kotlinx-io = "0.7.0"
ksp = "2.1.10-1.0.31"
reactor = "3.7.4"

# Web Framework
ktor = "3.0.0"
ktor = "3.1.1"

# Android
android = "8.1.0"

# Database
h2 = "2.2.224"
postgres = "42.6.0"
exposed = "0.44.0"
mongodb = "5.2.0"
h2 = "2.3.232"
postgres = "42.7.5"
exposed = "0.60.0"
mongodb = "5.3.1"

# Logging
logback = "1.5.12"
kotlin-logging = "5.1.0"
logging_capabilities = "0.11.1"

# Asynchronous and Concurrency
atomicfu = "0.25.0"
kotlinx-coroutines = "1.9.0"
atomicfu = "0.27.0"
kotlinx-coroutines = "1.10.1"

# Testing
#kotest = "6.0.0.M1"
Expand All @@ -36,7 +36,7 @@ kotest-test-containers = "2.0.2"
mockk = "1.13.4"
mockative = "2.0.1"
kmock = "0.3.0-rc08"
testcontainers = "1.19.8"
testcontainers = "1.20.6"
redis-testcontainers = "1.6.4"

# Code Quality and Coverage
Expand All @@ -47,10 +47,11 @@ koverBadge = "0.0.6"
detekt = "1.23.1"

# Date and Time
kotlinx-datetime = "0.4.0"
kotlinx-datetime = "0.6.2"

# Functional Programming
arrow = "1.2.4"
reactor = "3.7.4"

# Messaging
kafka = "3.5.1"
Expand All @@ -74,8 +75,8 @@ gradle-release = "3.0.2"
nexusPublish = "2.0.0"

# Miscellaneous
krontab = "2.2.1"
uuid = "0.8.1"
krontab = "2.7.2"
uuid = "0.8.4"

[libraries]
# Core libraries
Expand All @@ -87,6 +88,7 @@ kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serializa
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinx-coroutines" }
kotlinx-coroutines-reactive = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-reactive", version.ref = "kotlinx-coroutines" }
kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "kotlinx-coroutines" }
kotlinx-coroutines-reactive = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-reactive", version.ref = "kotlinx-coroutines" }
kotlinx-datetime = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version.ref = "kotlinx-datetime" }
kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "kotlinx-io" }
kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.kotest.assertions.fail
import io.kotest.core.spec.style.FunSpec
import io.kotest.core.spec.style.scopes.ContainerScope
import io.kotest.datatest.withData
import io.kotest.matchers.ints.shouldBeGreaterThan
import io.kotest.matchers.ints.shouldBeGreaterThanOrEqual
import io.ktor.server.application.log
import io.ktor.server.config.MapApplicationConfig
import io.ktor.server.config.mergeWith
Expand Down Expand Up @@ -77,7 +77,7 @@ abstract class TaskSchedulingPluginTest : FunSpec() {

try {
with(taskLogsAndApplications.map { it.first }.flatten()) {
size shouldBeGreaterThan executions - 2
size shouldBeGreaterThanOrEqual executions - 2
with(groupingBy { it }.eachCount()) {
val errors =
this.mapNotNull {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.LiteralOp
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.SqlExpressionBuilder.neq
import org.jetbrains.exposed.sql.SqlExpressionBuilder.isNull
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insertIgnore
Expand Down Expand Up @@ -59,12 +59,12 @@ public class JdbcLockManager(
database,
transactionIsolation = Connection.TRANSACTION_READ_COMMITTED,
) {
repetitionAttempts = 0
maxAttempts = 1
debug = true
taskLockTable.insertIgnore {
it[name] = task.name
it[concurrencyIndex] = taskConcurrencyIndex
it[lockedAt] = Instant.fromEpochMilliseconds(0)
it[lockedAt] = null
}
}.insertedCount == 1

Expand All @@ -84,7 +84,6 @@ public class JdbcLockManager(
selectClause(
task,
concurrencyIndex,
taskExecutionInstant,
)
},
) {
Expand All @@ -103,16 +102,37 @@ public class JdbcLockManager(
}
}

override suspend fun releaseLockKey(key: JdbcTaskLock) {}
override suspend fun releaseLockKey(key: JdbcTaskLock) {
newSuspendedTransaction(
application.coroutineContext,
db = database,
transactionIsolation = Connection.TRANSACTION_READ_COMMITTED,
) {
taskLockTable.update(
where = {
selectClauseForRelease(
key,
)
},
) {
it[lockedAt] = null
}
}
}

override fun close() {}

private fun selectClause(
task: Task,
concurrencyIndex: Int,
taskExecutionInstant: Instant,
) = (taskLockTable.name eq task.name and taskLockTable.concurrencyIndex.eq(concurrencyIndex)) and
lockedAt.neq(LiteralOp(KotlinInstantColumnType(), taskExecutionInstant))
lockedAt.isNull()

private fun selectClauseForRelease(lock: JdbcTaskLock) =
taskLockTable.name eq lock.name and taskLockTable.concurrencyIndex.eq(lock.concurrencyIndex) and
lockedAt.eq(
LiteralOp(KotlinInstantColumnType(), Instant.fromEpochMilliseconds(lock.lockedAt.unixMillisLong)),
)
}

public class JdbcTaskLock(
Expand All @@ -128,13 +148,13 @@ public abstract class ExposedTaskLockTable(
) : Table(tableName) {
public abstract val name: Column<String>
public abstract val concurrencyIndex: Column<Int>
public abstract val lockedAt: Column<Instant>
public abstract val lockedAt: Column<Instant?>
}

public object DefaultTaskLockTable : ExposedTaskLockTable("task_locks") {
override val name: Column<String> = text("_name")
override val concurrencyIndex: Column<Int> = integer("concurrency_index")
override val lockedAt: Column<Instant> = timestamp("locked_at").index()
override val lockedAt: Column<Instant?> = timestamp("locked_at").nullable().index()

override val primaryKey: PrimaryKey = PrimaryKey(firstColumn = name, concurrencyIndex, name = "pk_task_locks")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public class MongoDBLockManager(
Filters.and(
Filters.eq(MongoDbTaskLock::name.name, task.name),
Filters.eq(MongoDbTaskLock::concurrencyIndex.name, concurrencyIndex),
Filters.eq(MongoDbTaskLock::lockedAt.name, DateTime.EPOCH),
),
Filters.ne(MongoDbTaskLock::lockedAt.name, executionTime),
)
val updates =
Updates.combine(
Expand All @@ -86,6 +86,7 @@ public class MongoDBLockManager(
Indexes.compoundIndex(
Indexes.ascending(MongoDbTaskLock::name.name),
Indexes.ascending(MongoDbTaskLock::concurrencyIndex.name),
Indexes.ascending(MongoDbTaskLock::lockedAt.name),
),
IndexOptions().unique(true),
)
Expand All @@ -103,6 +104,7 @@ public class MongoDBLockManager(
Filters.and(
Filters.eq(MongoDbTaskLock::name.name, task.name),
Filters.eq(MongoDbTaskLock::concurrencyIndex.name, taskConcurrencyIndex),
Filters.eq(MongoDbTaskLock::lockedAt.name, DateTime.EPOCH),
),
).firstOrNull()
?.let { false } ?: runCatching {
Expand All @@ -127,7 +129,32 @@ public class MongoDBLockManager(
}
}

protected override suspend fun releaseLockKey(key: MongoDbTaskLock) {}
protected override suspend fun releaseLockKey(key: MongoDbTaskLock) {
val query =
Filters.and(
Filters.and(
Filters.eq(MongoDbTaskLock::name.name, key.name),
Filters.eq(MongoDbTaskLock::concurrencyIndex.name, key.concurrencyIndex),
Filters.eq(MongoDbTaskLock::lockedAt.name, key.lockedAt),
),
)
val updates =
Updates.combine(
Updates.set(MongoDbTaskLock::lockedAt.name, DateTime.EPOCH),
)
val options = FindOneAndUpdateOptions().upsert(false)
client.startSession().use { session ->
session.startTransaction(transactionOptions = majorityJTransaction())
runCatching {
collection.findOneAndUpdate(query, updates, options).also {
session.commitTransaction()
}
}.onFailure {
session.abortTransaction()
if (it !is MongoWriteException) throw it
}
}
}

override fun close() {
client.close()
Expand All @@ -143,7 +170,7 @@ public class MongoDBLockManager(
.build()
}

public data class MongoDbTaskLock(
public class MongoDbTaskLock(
override val name: String,
override val concurrencyIndex: Int,
override var lockedAt: DateTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.server.application.Application
import korlibs.time.DateTime
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.jvm.JvmInline

internal val logger = KotlinLogging.logger { }
Expand All @@ -28,29 +30,42 @@ public class RedisLockManager(
private val lockExpirationMs: Long,
private val connectionAcquisitionTimeoutMs: Long,
) : TaskLockManager<RedisTaskLock>() {
private val mutex = Mutex()

override suspend fun init(tasks: List<Task>) {}

override suspend fun acquireLockKey(
task: Task,
executionTime: DateTime,
concurrencyIndex: Int,
): RedisTaskLock? =
connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection ->
logger.debug { "${application.host()}: ${executionTime.format2()}: Acquiring lock for ${task.name} - $concurrencyIndex" }
val key = task.toRedisLockKey(executionTime, concurrencyIndex)
if (redisConnection.setNx(key.value, "1", lockExpirationMs) != null) {
logger.debug { "${application.host()}: ${executionTime.format2()}: Acquired lock for ${task.name} - $concurrencyIndex" }
return@withConnection key
}
null
} ?: run {
logger.debug {
"${application.host()}: ${executionTime.format2()}: Failed to acquire lock for ${task.name} - $concurrencyIndex"
): RedisTaskLock? {
logger.debug { "${application.host()}: ${executionTime.format2()}: Acquiring lock for ${task.name} - $concurrencyIndex" }
val key = task.toRedisLockKey(concurrencyIndex)
return mutex.withLock(key) {
connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection ->
if (redisConnection.setNx(key.value, executionTime.format2(), lockExpirationMs) != null) {
logger.debug { "${application.host()}: ${executionTime.format2()}: Acquired lock for ${task.name} - $concurrencyIndex" }
return@withConnection key
} else {
return@withConnection null
}
} ?: run {
logger.debug {
"${application.host()}: ${executionTime.format2()}: Failed to acquire lock for ${task.name} - $concurrencyIndex"
}
null
}
null
}
}

override suspend fun releaseLockKey(key: RedisTaskLock) {}
override suspend fun releaseLockKey(key: RedisTaskLock) {
mutex.withLock(key) {
connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection ->
logger.debug { "${application.host()}: Released lock for ${key.name} - ${key.concurrencyIndex}" }
redisConnection.del(key.value)
}
}
}

override fun close() {
runBlocking {
Expand All @@ -70,12 +85,10 @@ public value class RedisTaskLock internal constructor(
public val value: String,
) : TaskLock {
public companion object {
private const val DELIMITER = "-"
private const val DELIMITER = "-***-"

public fun Task.toRedisLockKey(
executionTime: DateTime,
concurrencyIndex: Int,
): RedisTaskLock = RedisTaskLock("${name.replace(DELIMITER, "_")}-$concurrencyIndex at ${executionTime.format2()}")
public fun Task.toRedisLockKey(concurrencyIndex: Int): RedisTaskLock =
RedisTaskLock("${name.replace(DELIMITER, "_")}$DELIMITER$concurrencyIndex")
}

override val name: String
Expand Down