public interface DistributedLockRepository extends ReactiveCrudRepository<DistributedLock, String> {
@Query("SELECT * FROM distributed_lock WHERE lock_key = :#{[0]} for update ")
Mono<DistributedLock> findByLockKey(String lockKey);
}
@Override
public boolean acquireLock(DistributedLockDO distributedLockDO) {
try {
return Boolean.TRUE.equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())
.publishOn(Schedulers.boundedElastic()).map(distributedLock -> {
if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
&& !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
&& System.currentTimeMillis() < distributedLock.getExpireTime()) {
return false;
}
distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
if (distributedLock != null) {
if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
distributedLock.setLockValue(distributedLockDO.getLockValue());
}
distributedLock.setNewLock(false);
return distributedLockRepository.save(distributedLock).block() != null;
}
distributedLock = new DistributedLock();
distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
return distributedLockRepository.save(distributedLock).block() != null;
}).as(operator::transactional).block());
} catch (R2dbcDataIntegrityViolationException e) {
// being scrambled by other threads to succeed
return false;
}
}
Caused by: io.r2dbc.spi.R2dbcTimeoutException: Lock wait timeout exceeded; try restarting transaction
at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ SQL "UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?" [DatabaseClient]
Original Stack Trace:
at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:317)
at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:292)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at dev.miku.r2dbc.mysql.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:70)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:265)
at dev.miku.r2dbc.mysql.client.ReactorNettyClient$ResponseSink.next(ReactorNettyClient.java:340)
at dev.miku.r2dbc.mysql.client.ReactorNettyClient.lambda$new$0(ReactorNettyClient.java:103)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.handleDecoded(MessageDuplexCodec.java:187)
at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.channelRead(MessageDuplexCodec.java:95)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
<==
When using the above code, the transaction will fail, and querying the data first and then updating it will fail due to the inability to get an x-lock on the database
When using the above code, the transaction will fail, and querying the data first and then updating it will fail due to the inability to get an x-lock on the database