Skip to content

Commit c2e2216

Browse files
authored
transaction: Add "scan_locks" & "resolve_locks" interface for transaction client (#524)
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent 061c8d1 commit c2e2216

5 files changed

Lines changed: 285 additions & 30 deletions

File tree

src/request/keyspace.rs

Lines changed: 131 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,32 @@ impl TruncateKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
182182
}
183183
}
184184

185+
impl EncodeKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
186+
fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
187+
if !matches!(keyspace, Keyspace::Enable { .. }) {
188+
return self;
189+
}
190+
for lock in &mut self {
191+
take_mut::take(&mut lock.key, |key| {
192+
Key::from(key).encode_keyspace(keyspace, key_mode).into()
193+
});
194+
take_mut::take(&mut lock.primary_lock, |primary| {
195+
Key::from(primary)
196+
.encode_keyspace(keyspace, key_mode)
197+
.into()
198+
});
199+
for secondary in lock.secondaries.iter_mut() {
200+
take_mut::take(secondary, |secondary| {
201+
Key::from(secondary)
202+
.encode_keyspace(keyspace, key_mode)
203+
.into()
204+
});
205+
}
206+
}
207+
self
208+
}
209+
}
210+
185211
fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_LEN] {
186212
let mut prefix = keyspace_id.to_be_bytes();
187213
prefix[0] = match key_mode {
@@ -273,23 +299,103 @@ mod tests {
273299
mutation.encode_keyspace(keyspace, key_mode),
274300
expected_mutation
275301
);
302+
303+
let key_mode = KeyMode::Txn;
304+
let lock = crate::proto::kvrpcpb::LockInfo {
305+
key: vec![b'k', b'1'],
306+
primary_lock: vec![b'p', b'1'],
307+
secondaries: vec![vec![b's', b'1'], vec![b's', b'2']],
308+
..Default::default()
309+
};
310+
let locks = vec![lock].encode_keyspace(keyspace, key_mode);
311+
assert_eq!(locks.len(), 1);
312+
assert_eq!(locks[0].key, vec![b'x', 0, 0xDE, 0xAD, b'k', b'1']);
313+
assert_eq!(locks[0].primary_lock, vec![b'x', 0, 0xDE, 0xAD, b'p', b'1']);
314+
assert_eq!(
315+
locks[0].secondaries,
316+
vec![
317+
vec![b'x', 0, 0xDE, 0xAD, b's', b'1'],
318+
vec![b'x', 0, 0xDE, 0xAD, b's', b'2']
319+
]
320+
);
276321
}
277322

278323
#[test]
279324
fn test_truncate_version() {
280-
let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
281325
let keyspace = Keyspace::Enable {
282326
keyspace_id: 0xDEAD,
283327
};
328+
329+
let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
284330
let expected_key = Key::from(vec![0xBE, 0xEF]);
285331
assert_eq!(key.truncate_keyspace(keyspace), expected_key);
286332

287333
let key = Key::from(vec![b'x', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
288-
let keyspace = Keyspace::Enable {
289-
keyspace_id: 0xDEAD,
290-
};
291334
let expected_key = Key::from(vec![0xBE, 0xEF]);
292335
assert_eq!(key.truncate_keyspace(keyspace), expected_key);
336+
337+
let pair = KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']);
338+
let expected_pair = KvPair(Key::from(vec![b'k']), vec![b'v']);
339+
assert_eq!(pair.truncate_keyspace(keyspace), expected_pair);
340+
341+
let range = Range {
342+
start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']),
343+
end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']),
344+
};
345+
let expected_range = Range {
346+
start: Key::from(vec![b'a']),
347+
end: Key::from(vec![b'b']),
348+
};
349+
assert_eq!(range.truncate_keyspace(keyspace), expected_range);
350+
351+
let ranges = vec![
352+
Range {
353+
start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']),
354+
end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']),
355+
},
356+
Range {
357+
start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'c']),
358+
end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'd']),
359+
},
360+
];
361+
let expected_ranges = vec![
362+
Range {
363+
start: Key::from(vec![b'a']),
364+
end: Key::from(vec![b'b']),
365+
},
366+
Range {
367+
start: Key::from(vec![b'c']),
368+
end: Key::from(vec![b'd']),
369+
},
370+
];
371+
assert_eq!(ranges.truncate_keyspace(keyspace), expected_ranges);
372+
373+
let pairs = vec![
374+
KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']),
375+
KvPair(
376+
Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k', b'2']),
377+
vec![b'v', b'2'],
378+
),
379+
];
380+
let expected_pairs = vec![
381+
KvPair(Key::from(vec![b'k']), vec![b'v']),
382+
KvPair(Key::from(vec![b'k', b'2']), vec![b'v', b'2']),
383+
];
384+
assert_eq!(pairs.truncate_keyspace(keyspace), expected_pairs);
385+
386+
let lock = crate::proto::kvrpcpb::LockInfo {
387+
key: vec![b'x', 0, 0xDE, 0xAD, b'k'],
388+
primary_lock: vec![b'x', 0, 0xDE, 0xAD, b'p'],
389+
secondaries: vec![vec![b'x', 0, 0xDE, 0xAD, b's']],
390+
..Default::default()
391+
};
392+
let expected_lock = crate::proto::kvrpcpb::LockInfo {
393+
key: vec![b'k'],
394+
primary_lock: vec![b'p'],
395+
secondaries: vec![vec![b's']],
396+
..Default::default()
397+
};
398+
assert_eq!(vec![lock].truncate_keyspace(keyspace), vec![expected_lock]);
293399
}
294400

295401
#[test]
@@ -320,6 +426,27 @@ mod tests {
320426
mutation.clone().encode_keyspace(keyspace, key_mode),
321427
mutation
322428
);
429+
430+
let lock = crate::proto::kvrpcpb::LockInfo {
431+
key: vec![b'x', 0, 0, 0, b'k'],
432+
primary_lock: vec![b'x', 0, 0, 0, b'p'],
433+
secondaries: vec![vec![b'x', 0, 0, 0, b's']],
434+
..Default::default()
435+
};
436+
let locks = vec![lock];
437+
assert_eq!(locks.clone().encode_keyspace(keyspace, key_mode), locks);
438+
439+
let lock = crate::proto::kvrpcpb::LockInfo {
440+
key: vec![b'k', b'1'],
441+
primary_lock: vec![b'p', b'1'],
442+
secondaries: vec![vec![b's', b'1']],
443+
..Default::default()
444+
};
445+
let locks = vec![lock.clone()];
446+
assert_eq!(
447+
locks.clone().encode_keyspace(Keyspace::Disable, key_mode),
448+
locks
449+
);
323450
}
324451

325452
#[test]

src/request/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl<Req: KvRequest + StoreRequest> StoreRequest for Dispatch<Req> {
8888
const MULTI_REGION_CONCURRENCY: usize = 16;
8989
const MULTI_STORES_CONCURRENCY: usize = 16;
9090

91-
fn is_grpc_error(e: &Error) -> bool {
91+
pub(crate) fn is_grpc_error(e: &Error) -> bool {
9292
matches!(e, Error::GrpcAPI(_) | Error::Grpc(_))
9393
}
9494

src/transaction/client.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
99
use crate::config::Config;
1010
use crate::pd::PdClient;
1111
use crate::pd::PdRpcClient;
12+
use crate::proto::kvrpcpb;
1213
use crate::proto::pdpb::Timestamp;
1314
use crate::request::plan::CleanupLocksResult;
1415
use crate::request::EncodeKeyspace;
@@ -19,6 +20,7 @@ use crate::timestamp::TimestampExt;
1920
use crate::transaction::lock::ResolveLocksOptions;
2021
use crate::transaction::lowering::new_scan_lock_request;
2122
use crate::transaction::lowering::new_unsafe_destroy_range_request;
23+
use crate::transaction::resolve_locks;
2224
use crate::transaction::ResolveLocksContext;
2325
use crate::transaction::Snapshot;
2426
use crate::transaction::Transaction;
@@ -294,9 +296,7 @@ impl Client {
294296
plan.execute().await
295297
}
296298

297-
// For test.
298299
// Note: `batch_size` must be >= expected number of locks.
299-
#[cfg(feature = "integration-tests")]
300300
pub async fn scan_locks(
301301
&self,
302302
safepoint: &Timestamp,
@@ -314,6 +314,42 @@ impl Client {
314314
Ok(plan.execute().await?.truncate_keyspace(self.keyspace))
315315
}
316316

317+
/// Resolves the given locks and returns any that remain live.
318+
///
319+
/// This method retries until either all locks are resolved or the provided
320+
/// `backoff` is exhausted. The `timestamp` is used as the caller start
321+
/// timestamp when checking transaction status.
322+
pub async fn resolve_locks(
323+
&self,
324+
locks: Vec<kvrpcpb::LockInfo>,
325+
timestamp: Timestamp,
326+
mut backoff: Backoff,
327+
) -> Result<Vec<kvrpcpb::LockInfo>> {
328+
use crate::request::TruncateKeyspace;
329+
330+
let mut live_locks = locks;
331+
loop {
332+
let resolved_locks = resolve_locks(
333+
live_locks.encode_keyspace(self.keyspace, KeyMode::Txn),
334+
timestamp.clone(),
335+
self.pd.clone(),
336+
self.keyspace,
337+
)
338+
.await?;
339+
live_locks = resolved_locks.truncate_keyspace(self.keyspace);
340+
if live_locks.is_empty() {
341+
return Ok(live_locks);
342+
}
343+
344+
match backoff.next_delay_duration() {
345+
None => return Ok(live_locks),
346+
Some(delay_duration) => {
347+
tokio::time::sleep(delay_duration).await;
348+
}
349+
}
350+
}
351+
}
352+
317353
/// Cleans up all keys in a range and quickly reclaim disk space.
318354
///
319355
/// The range can span over multiple regions.

0 commit comments

Comments
 (0)