Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions src/main/scala/dpla/api/v2/search/ConcurrencyLimiter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package dpla.api.v2.search

import java.util.concurrent.{Semaphore, TimeUnit}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

/** Limits concurrent execution of Futures using a semaphore.
*
* IMPORTANT: The `apply` method uses `tryAcquire` with a timeout, which BLOCKS
* the calling thread for up to `timeoutSeconds`. In Akka actor contexts, this
* means actor threads may be blocked. This is intentional to provide
* backpressure when the system is overloaded, but callers should be aware of
* this behavior.
*
* For high-throughput scenarios, consider:
* - Using a dedicated blocking dispatcher for operations that use this
* limiter
* - Tuning `maxConcurrent` and `timeoutSeconds` based on your workload
* - Monitoring permit acquisition times
*
* @param maxConcurrent
* Maximum number of concurrent operations allowed
* @param timeoutSeconds
* Maximum time to wait for a permit before failing
*/
class ConcurrencyLimiter(
val maxConcurrent: Int,
val timeoutSeconds: Long
) {
require(
maxConcurrent > 0,
s"maxConcurrent must be positive, got: $maxConcurrent"
)
require(
timeoutSeconds > 0,
s"timeoutSeconds must be positive, got: $timeoutSeconds"
)

private val semaphore = new Semaphore(maxConcurrent)
Comment on lines +26 to +39
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Expectation:
# - Production search/ES code should instantiate or receive ConcurrencyLimiter.
# - Legacy inline tryAcquire wrappers should be removed or intentionally documented.

printf '\n== ConcurrencyLimiter references ==\n'
rg -n -C4 --type scala '\bnew\s+ConcurrencyLimiter\b|\bConcurrencyLimiter\s*\('

printf '\n== Remaining inline semaphore limiter patterns ==\n'
rg -n -C6 --type scala '\bwithConcurrencyLimit\s*\(|\.tryAcquire\s*\(|case\s+e:\s*Throwable|case\s+NonFatal'

Repository: dpla/api

Length of output: 12060


Wire ConcurrencyLimiter into the ES request path to eliminate duplication.

ConcurrencyLimiter is defined and tested, but production code still bypasses it. ElasticSearchClient.withConcurrencyLimit (lines 95–115) implements its own semaphore wrapper with identical logic. Either integrate ConcurrencyLimiter into the ES request path or remove the inline implementation to prevent maintenance divergence.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/scala/dpla/api/v2/search/ConcurrencyLimiter.scala` around lines 26 -
39, The project has duplicate semaphore logic: ConcurrencyLimiter is implemented
but ElasticSearchClient.withConcurrencyLimit contains an inline semaphore
wrapper; replace that inline logic by instantiating and using ConcurrencyLimiter
(or delegating to its methods) inside ElasticSearchClient.withConcurrencyLimit
so all ES requests go through the shared ConcurrencyLimiter instance (or remove
the inline wrapper and call ConcurrencyLimiter directly), ensuring you reuse the
ConcurrencyLimiter constructor/timeout behavior and its acquire/release
semantics rather than duplicating semaphore code.


/** Wraps a Future with concurrency limiting.
*
* - Attempts to acquire a permit with timeout
* - If permit acquired, executes the Future and releases permit on
* completion
* - If timeout exceeded, returns a failed Future immediately
* - Ensures permit is released even if Future construction throws
*
* @param f
* The Future to execute (call-by-name, evaluated only if permit acquired)
* @param ec
* ExecutionContext for Future callbacks
* @return
* The wrapped Future, or a failed Future if permit couldn't be acquired
*/
def apply[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val acquired = try {
semaphore.tryAcquire(timeoutSeconds, TimeUnit.SECONDS)
} catch {
case e: InterruptedException =>
Thread.currentThread().interrupt()
return Future.failed(e)
}

if (!acquired) {
Future.failed(
ConcurrencyLimitExceeded(
maxConcurrent = maxConcurrent,
timeoutSeconds = timeoutSeconds
)
)
} else {
try {
val future = f
future.andThen { case _ => semaphore.release() }(ec)
} catch {
case NonFatal(e) =>
semaphore.release()
Future.failed(e)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

/** Returns the number of permits currently available. Useful for monitoring
* and debugging.
*/
def availablePermits: Int = semaphore.availablePermits()
}

/** Exception thrown when a concurrency limit is exceeded and the timeout
* expires.
*/
case class ConcurrencyLimitExceeded(
maxConcurrent: Int,
timeoutSeconds: Long
) extends RuntimeException(
s"Concurrency limit ($maxConcurrent) exceeded, " +
s"timed out after ${timeoutSeconds}s waiting for permit"
)
Loading
Loading