Skip to content

Commit 9f6d0ca

Browse files
committed
fix kragniz#1107; missing DELETE event
1 parent 4778240 commit 9f6d0ca

2 files changed

Lines changed: 65 additions & 55 deletions

File tree

etcd3/locks.py

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
import threading
2+
import time
13
import uuid
24

3-
import tenacity
4-
5-
from etcd3 import exceptions
5+
from etcd3 import events
66

77

88
class Lock(object):
@@ -55,54 +55,61 @@ def acquire(self, timeout=10):
5555
:returns: True if the lock has been acquired, False otherwise.
5656
5757
"""
58-
stop = (
59-
tenacity.stop_never
60-
if timeout is None else tenacity.stop_after_delay(timeout)
61-
)
58+
if timeout is not None:
59+
deadline = time.time() + timeout
6260

63-
def wait(previous_attempt_number, delay_since_first_attempt):
64-
if timeout is None:
65-
remaining_timeout = None
66-
else:
67-
remaining_timeout = max(timeout - delay_since_first_attempt, 0)
68-
# TODO(jd): Wait for a DELETE event to happen: that'd mean the lock
69-
# has been released, rather than retrying on PUT events too
70-
try:
71-
self.etcd_client.watch_once(self.key, remaining_timeout)
72-
except exceptions.WatchTimedOut:
73-
pass
74-
return 0
75-
76-
@tenacity.retry(retry=tenacity.retry_never,
77-
stop=stop,
78-
wait=wait)
79-
def _acquire():
80-
# TODO: save the created revision so we can check it later to make
81-
# sure we still have the lock
82-
83-
self.lease = self.etcd_client.lease(self.ttl)
84-
85-
success, _ = self.etcd_client.transaction(
86-
compare=[
87-
self.etcd_client.transactions.create(self.key) == 0
88-
],
89-
success=[
90-
self.etcd_client.transactions.put(self.key, self.uuid,
91-
lease=self.lease)
92-
],
93-
failure=[
94-
self.etcd_client.transactions.get(self.key)
95-
]
96-
)
97-
if success is True:
61+
while True:
62+
if self._try_acquire():
9863
return True
99-
self.lease = None
100-
raise tenacity.TryAgain
10164

102-
try:
103-
return _acquire()
104-
except tenacity.RetryError:
105-
return False
65+
if timeout is not None:
66+
remaining_timeout = max(deadline - time.time(), 0)
67+
if remaining_timeout == 0:
68+
return False
69+
else:
70+
remaining_timeout = None
71+
72+
self._wait_delete_event(remaining_timeout)
73+
74+
def _try_acquire(self):
75+
self.lease = self.etcd_client.lease(self.ttl)
76+
77+
success, metadata = self.etcd_client.transaction(
78+
compare=[
79+
self.etcd_client.transactions.create(self.key) == 0
80+
],
81+
success=[
82+
self.etcd_client.transactions.put(self.key, self.uuid,
83+
lease=self.lease)
84+
],
85+
failure=[
86+
self.etcd_client.transactions.get(self.key)
87+
]
88+
)
89+
if success is True:
90+
self.revision = metadata[0].response_put.header.revision
91+
return True
92+
self.revision = metadata[0][0][1].mod_revision
93+
self.lease = None
94+
return False
95+
96+
def _wait_delete_event(self, timeout):
97+
event_iter, cancel = self.etcd_client.watch(
98+
self.key, start_revision=self.revision + 1)
99+
100+
if timeout is not None:
101+
timer = threading.Timer(timeout, cancel)
102+
timer.start()
103+
else:
104+
timer = None
105+
106+
for event in event_iter:
107+
if isinstance(event, events.DeleteEvent):
108+
if timer is not None:
109+
timer.cancel()
110+
111+
cancel()
112+
break
106113

107114
def release(self):
108115
"""Release the lock."""

tests/test_etcd3.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -847,15 +847,15 @@ def test_lock_acquire_with_timeout(self, etcd):
847847
lock1 = etcd.lock('lock-10', ttl=10)
848848
lock2 = etcd.lock('lock-10', ttl=10)
849849

850-
original_watch_once = etcd.watch_once
851-
watch_once_called = [0]
850+
original_watch = etcd.watch
851+
watch_called = [0]
852852

853853
def release_lock_before_watch(*args, **kwargs):
854-
watch_once_called[0] += 1
854+
watch_called[0] += 1
855855
# Simulates the case where key is expired before watch is called.
856856
# See https://github.com/kragniz/python-etcd3/issues/1107
857857
lock1.release()
858-
return original_watch_once(*args, **kwargs)
858+
return original_watch(*args, **kwargs)
859859

860860
original_transaction = etcd.transaction
861861
transaction_called = [0]
@@ -865,14 +865,17 @@ def transaction_wrapper(*args, **kwargs):
865865
return original_transaction(*args, **kwargs)
866866

867867
assert lock1.acquire() is True
868-
with mock.patch.object(etcd3.Etcd3Client, 'watch_once',
868+
with mock.patch.object(etcd3.Etcd3Client, 'watch',
869869
wraps=release_lock_before_watch):
870870
with mock.patch.object(etcd3.Etcd3Client, 'transaction',
871871
wraps=transaction_wrapper):
872872
assert lock2.acquire(timeout=5) is True
873873

874-
assert watch_once_called[0] == 1
875-
assert transaction_called[0] == 2
874+
# watch must be called only for lock2 once
875+
assert watch_called[0] == 1
876+
877+
# transaction must be called once for lock1, twice for lock2
878+
assert transaction_called[0] == 3
876879

877880
def test_internal_exception_on_internal_error(self, etcd):
878881
exception = self.MockedException(grpc.StatusCode.INTERNAL)

0 commit comments

Comments
 (0)